Skip to content

Commit

Permalink
Tasks plugin: async queue helpers (#354)
Browse files Browse the repository at this point in the history
* Tasks plugin: async tasks example

* fix clippy warning

* cargo fmt
  • Loading branch information
Wulf authored Sep 29, 2023
1 parent 7ceab89 commit d49e608
Show file tree
Hide file tree
Showing 12 changed files with 335 additions and 62 deletions.
21 changes: 16 additions & 5 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

13 changes: 7 additions & 6 deletions create-rust-app/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -85,7 +85,7 @@ utoipa = { optional = true, version = "3", features = [
] }

# plugin_tasks
fang = { optional = true, version = "0.10.3" }
fang = { optional = true, version = "0.10.4", features = ["asynk"] }

##
## BACKENDS
Expand Down Expand Up @@ -120,9 +120,9 @@ mime_guess = { optional = true, version = "2.0.4" } # backend_poem, plugin_stora
anyhow = { optional = true, version = "1.0.71" } # backend_poem, plugin_auth, plugin_dev
tokio = { optional = true, version = "1", features = [
"full",
] } # backend_poem, backend_axum, plugin_storage
async-priority-channel = "0.1.0"
futures-util = { optional = true, version = "0.3.28" } # plugin_dev, TODO:plugin_storage?
] } # backend_poem, backend_axum, plugin_storage, plugin_tasks
async-priority-channel = "0.1.0" # plugin_dev
futures-util = { optional = true, version = "0.3.28" } # plugin_dev, plugin_storage

[features]
default = [
Expand Down Expand Up @@ -169,10 +169,11 @@ plugin_storage = [
"md5",
"mime_guess",
"base64",
] # note: might need to add "futures-util"?
"futures-util"
]
plugin_graphql = []
plugin_utoipa = ["utoipa", "backend_actix-web"]
plugin_tasks = ["fang"]
plugin_tasks = ["fang", "tokio"]
backend_poem = ["poem", "anyhow", "mime_guess", "tokio"]
backend_actix-web = [
"actix-web",
Expand Down
4 changes: 2 additions & 2 deletions create-rust-app/src/database/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -52,12 +52,12 @@ impl Database {
POOL.get_or_init(|| {
Pool::builder()
.connection_timeout(std::time::Duration::from_secs(5))
.build(ConnectionManager::<DbCon>::new(Self::database_url()))
.build(ConnectionManager::<DbCon>::new(Self::connection_url()))
.unwrap()
})
}

fn database_url() -> String {
pub fn connection_url() -> String {
std::env::var("DATABASE_URL").expect("DATABASE_URL environment variable expected.")
}
}
42 changes: 41 additions & 1 deletion create-rust-app/src/tasks/mod.rs
Original file line number Diff line number Diff line change
@@ -1,9 +1,14 @@
use crate::Database;
use fang::Queue;
use fang::{AsyncQueue, NoTls, Queue};
use once_cell::sync::OnceCell;
use std::sync::Mutex;
// re-export setup for tasks
pub use crate::setup;

/// Returns a reference to the synchronous queue.
///
/// Tasks scheduled in this queue will be executed by the synchronous queue process.
/// A sync queue process can be started by running the `queue` binary.
pub fn queue() -> &'static Queue {
#[cfg(debug_assertions)]
crate::load_env_vars();
Expand All @@ -16,3 +21,38 @@ pub fn queue() -> &'static Queue {
Queue::builder().connection_pool(db.pool.clone()).build()
})
}

/// Returns a reference to the async queue.
///
/// Tasks scheduled in this queue will be executed by the async queue process.
/// An async queue process can be started by running the `async_queue` binary (this is the default for newly generated projects).
///
/// Make sure you connect this queue to your db at the start of your app:
/// ```rust
/// let queue = async_queue();
/// queue.lock().unwrap().connect(NoTls).await.expect("Failed to connect to queue DB");
/// ```
pub fn async_queue() -> &'static Mutex<AsyncQueue<NoTls>> {
static ASYNC_QUEUE: OnceCell<Mutex<AsyncQueue<NoTls>>> = OnceCell::new();

ASYNC_QUEUE.get_or_init(|| {
Mutex::new(create_async_queue(10 /* r2d2's default */))
})
}

/// Creates a new async queue with the specified max pool size.
/// You should not need to use this function directly.
/// Instead, use `async_queue()` to get a reference to the queue.
///
/// This function is public because it is used by the `async_queue` binary in generated projects.
/// You can also use it to create a separate async queue bin for your own purposes.
/// For example, to support a different async task type.
pub fn create_async_queue(max_pool_size: u32) -> AsyncQueue<NoTls> {
#[cfg(debug_assertions)]
crate::load_env_vars();

AsyncQueue::builder()
.uri(Database::connection_url())
.max_pool_size(max_pool_size)
.build()
}
103 changes: 73 additions & 30 deletions create-rust-app_cli/src/content/project.rs
Original file line number Diff line number Diff line change
Expand Up @@ -44,10 +44,24 @@ fn get_current_cra_lib_version() -> String {
"9".to_string()
}

#[derive(Clone)]
struct ProjectBinary {
name: &'static str,
path: &'static str,
}

fn is_restricted_project_name(project_name: &str, project_binaries: &[ProjectBinary]) -> bool {
project_binaries
.iter()
.map(|bin| bin.name)
.collect::<Vec<&str>>()
.contains(&project_name)
}

fn add_bins_to_cargo_toml(
project_dir: &std::path::PathBuf,
creations_options: &CreationOptions,
) -> Result<(), std::io::Error> {
) -> Result<()> {
let mut path = std::path::PathBuf::from(project_dir);
path.push("Cargo.toml");

Expand Down Expand Up @@ -80,45 +94,66 @@ fn add_bins_to_cargo_toml(

let updated_toml = toml::to_string(&parsed_toml).unwrap();

let queue_bin = if creations_options
let mut project_binaries: Vec<ProjectBinary> = vec![
ProjectBinary {
name: "fullstack",
path: ".cargo/bin/fullstack.rs",
},
ProjectBinary {
name: "tsync",
path: ".cargo/bin/tsync.rs",
},
ProjectBinary {
name: "dsync",
path: ".cargo/bin/dsync.rs",
},
ProjectBinary {
name: "backend",
path: ".cargo/bin/backend.rs",
},
ProjectBinary {
name: "frontend",
path: ".cargo/bin/frontend.rs",
},
];

if creations_options
.cra_enabled_features
.contains(&"plugin_tasks".to_string())
{
r#"
[[bin]]
name = "queue"
path = "backend/queue.rs"
"#
} else {
""
project_binaries.push(ProjectBinary {
name: "queue",
path: ".cargo/bin/queue.rs",
});
project_binaries.push(ProjectBinary {
name: "async_queue",
path: ".cargo/bin/async_queue.rs",
});
};

let binaries_cargo_toml_string = project_binaries
.clone()
.iter()
.map(|bin| {
format!(
r#"[[bin]]
name = "{name}"
path = "{path}"
"#,
name = bin.name,
path = bin.path
)
})
.collect::<Vec<String>>()
.join("\n");

let append_to_toml = format!(
r#"
[[bin]]
name = "fullstack"
path = ".cargo/bin/fullstack.rs"
[[bin]]
name = "tsync"
path = ".cargo/bin/tsync.rs"
[[bin]]
name = "dsync"
path = ".cargo/bin/dsync.rs"
[[bin]]
name = "backend"
path = ".cargo/bin/backend.rs"
[[bin]]
name = "frontend"
path = ".cargo/bin/frontend.rs"
{binaries_cargo_toml_string}
[[bin]]
name = "{project_name}"
path = "backend/main.rs"
{queue_bin}
[profile.dev]
debug-assertions=true
"#
Expand All @@ -131,6 +166,14 @@ debug-assertions=true

std::fs::write(&path, final_toml)?;

// check if the project name is valid
if is_restricted_project_name(&project_name, &project_binaries) {
logger::error(&format!(
"Invalid project name: '{project_name}' (running `cargo run --bin {project_name}` is used by a binary generated by create-rust-app)."
));
return Err(anyhow::anyhow!("Invalid project name"));
}

Ok(())
}

Expand Down
14 changes: 14 additions & 0 deletions create-rust-app_cli/src/plugins/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,20 @@ pub trait Plugin {
install_config.backend_framework,
)?;

logger::command_msg("cargo fmt");

let cargo_fmt = std::process::Command::new("cargo")
.current_dir(".")
.arg("fmt")
.stdout(std::process::Stdio::null())
.status()
.expect("failed to execute process");

if !cargo_fmt.success() {
logger::error("Failed to execute `cargo fmt`");
std::process::exit(1);
}

logger::command_msg("git add -A");

let git_add = std::process::Command::new("git")
Expand Down
Loading

0 comments on commit d49e608

Please sign in to comment.