Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: manage cache and init scripts from worker group UI #2396

Merged
merged 7 commits into from
Oct 5, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -261,7 +261,6 @@ it being synced automatically everyday.
| WORKER_GROUP | default | The worker group the worker belongs to and get its configuration pulled from | Worker |
| SERVER_BIND_ADDR | 0.0.0.0 | IP Address on which to bind listening socket | Server |
| PORT | 8000 | Exposed port | Server |
| NUM_WORKERS | 1 | The number of worker per Worker instance (Set to 0 for API/Server instances, Set to 1 for normal workers, and > 1 for workers dedicated to native jobs) | Worker |
| DISABLE_SERVER | false | Disable the external API, operate as a worker only instance | Worker |
| METRICS_ADDR | None | (ee only) The socket addr at which to expose Prometheus metrics at the /metrics path. Set to "true" to expose it on port 8001 | All |
| JSON_FMT | false | Output the logs in json format instead of logfmt | All |
Expand Down Expand Up @@ -301,6 +300,7 @@ it being synced automatically everyday.
| PIP_LOCAL_DEPENDENCIES | None | Specify dependencies that are installed locally and do not need to be solved nor installed again | |
| ADDITIONAL_PYTHON_PATHS | None | Specify python paths (separated by a :) to be appended to the PYTHONPATH of the python jobs. To be used with PIP_LOCAL_DEPENDENCIES to use python codebases within Windmill | Worker |
| INCLUDE_HEADERS | None | Whitelist of headers that are passed to jobs as args (separated by a comma) | Server |
| NUM_WORKERS | 1 | The number of worker per Worker instance (Set to 0 for API/Server instances, Set to 1 for normal workers, and > 1 for workers dedicated to native jobs) | Worker |
| INSTANCE_EVENTS_WEBHOOK | None | Webhook to notify of events such as new user added, signup/invite. Can hook back to windmill to send emails |
| GLOBAL_CACHE_INTERVAL | 10\*60 | (Enterprise Edition only) Interval in seconds in between bucket sync of the cache. This interval \* 2 is the time at which you're guaranteed all the worker's caches are synced together. | Worker |
| WORKER_TAGS | 'deno,go,python3,bash,flow,hub,dependency' | The worker groups assigned to that workers | Worker |
Expand Down
15 changes: 8 additions & 7 deletions backend/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -198,11 +198,7 @@ Windmill Community Edition {GIT_VERSION}

let mut rx = rx.resubscribe();
let base_internal_url = base_internal_url.to_string();
let rd_delay = rand::thread_rng().gen_range(0..30);
tokio::spawn(async move {
//monitor_db is applied at start, no need to apply it twice
tokio::time::sleep(Duration::from_secs(rd_delay)).await;

let h = tokio::spawn(async move {
let mut listener = retry_listen_pg(&db).await;

loop {
Expand All @@ -222,16 +218,18 @@ Windmill Community Edition {GIT_VERSION}
tracing::info!("Received new pg notification: {n:?}");
match n.channel() {
"notify_config_change" => {
tracing::info!("Config change detected: {}", n.payload());
match n.payload() {
"server" if server_mode => {
tracing::info!("Server config change detected: {}", n.payload());

reload_server_config(&db).await;
},
a@ _ if worker_mode && a == format!("worker__{}", *WORKER_GROUP) => {
tracing::info!("Worker config change detected: {}", n.payload());
reload_worker_config(&db, tx.clone(), true).await;
},
_ => {
tracing::error!("config target neither a server or a worker");
tracing::debug!("config changed but did not target this server/worker");
}
}
},
Expand Down Expand Up @@ -302,6 +300,9 @@ Windmill Community Edition {GIT_VERSION}
}
});

if let Err(e) = h.await {
tracing::error!("Error waiting for monitor handle:{e}")
}
Ok(()) as anyhow::Result<()>
};

Expand Down
25 changes: 21 additions & 4 deletions backend/src/monitor.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
use std::{collections::HashMap, fmt::Display, ops::Mul, str::FromStr, sync::Arc};
use std::{collections::HashMap, fmt::Display, ops::Mul, str::FromStr, sync::Arc, time::Duration};

use once_cell::sync::OnceCell;
use serde::de::DeserializeOwned;
Expand Down Expand Up @@ -467,9 +467,26 @@ pub async fn reload_worker_config(
let wc = WORKER_CONFIG.read().await;
let config = config.unwrap();
if *wc != config {
if kill_if_change && (*wc).dedicated_worker != config.dedicated_worker {
tracing::info!("Dedicated worker config changed, sending killpill. Expecting to be restarted by supervisor.");
let _ = tx.send(());
if kill_if_change {
if (*wc).dedicated_worker != config.dedicated_worker {
tracing::info!("Dedicated worker config changed, sending killpill. Expecting to be restarted by supervisor.");
let _ = tx.send(());
}

if (*wc).init_bash != config.init_bash {
tracing::info!("Init bash config changed, sending killpill. Expecting to be restarted by supervisor.");
let _ = tx.send(());
}

if (*wc).cache_clear != config.cache_clear {
tracing::info!("Cache clear changed, sending killpill. Expecting to be restarted by supervisor.");
let _ = tx.send(());
tracing::info!("Waiting 5 seconds to allow others workers to start potential jobs that depend on a potential shared cache volume");
tokio::time::sleep(Duration::from_secs(5)).await;
if let Err(e) = windmill_worker::common::clean_cache().await {
tracing::error!("Error cleaning the cache: {e}");
}
}
}
drop(wc);

Expand Down
4 changes: 2 additions & 2 deletions backend/windmill-api/src/oauth2.rs
Original file line number Diff line number Diff line change
Expand Up @@ -35,9 +35,9 @@ use tower_cookies::{Cookie, Cookies};
use windmill_audit::{audit_log, ActionKind};
use windmill_common::db::UserDB;
use windmill_common::jobs::JobPayload;
use windmill_common::more_serde::maybe_number_opt;
use windmill_common::users::username_to_permissioned_as;
use windmill_common::utils::{not_found_if_none, now_from_db};
use windmill_common::more_serde::maybe_number_opt;

use crate::db::ApiAuthed;
use crate::saml::SamlSsoLogin;
Expand Down Expand Up @@ -659,7 +659,7 @@ pub async fn _refresh_token<'c>(

async fn _exchange_token(client: OClient, refresh_token: &str) -> Result<TokenResponse, Error> {
let token_json = client
.exchange_refresh_token(&RefreshToken::from(refresh_token.clone()))
.exchange_refresh_token(&RefreshToken::from(refresh_token))
.with_client(&HTTP_CLIENT)
.execute::<serde_json::Value>()
.await
Expand Down
2 changes: 1 addition & 1 deletion backend/windmill-common/src/db.rs
Original file line number Diff line number Diff line change
Expand Up @@ -106,7 +106,7 @@ impl UserDB {
.await?;

let (folders_write, folders_read): &(Vec<_>, Vec<_>) =
&authed.folders().clone().into_iter().partition(|x| x.1);
&authed.folders().into_iter().partition(|x| x.1);

let mut folders_read = folders_read.clone();
folders_read.extend(folders_write.clone());
Expand Down
42 changes: 41 additions & 1 deletion backend/windmill-common/src/worker.rs
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,10 @@ lazy_static::lazy_static! {
pub static ref WORKER_CONFIG: Arc<RwLock<WorkerConfig>> = Arc::new(RwLock::new(WorkerConfig {
worker_tags: Default::default(),
dedicated_worker: Default::default(),
cache_clear: Default::default(),
init_bash: Default::default(),
additional_python_paths: Default::default(),
pip_local_dependencies: Default::default()
}));

pub static ref SERVER_CONFIG: Arc<RwLock<ServerConfig>> = Arc::new(RwLock::new(ServerConfig { smtp: Default::default(), timeout_wait_result: 20 }));
Expand Down Expand Up @@ -195,6 +199,26 @@ pub async fn load_worker_config(db: &DB) -> error::Result<WorkerConfig> {
})
.unwrap_or_else(|| DEFAULT_TAGS.clone()),
dedicated_worker,
init_bash: config
.init_bash
.or_else(|| std::env::var("INIT_SCRIPT").ok())
.and_then(|x| if x.is_empty() { None } else { Some(x) }),
cache_clear: config.cache_clear,
pip_local_dependencies: config.pip_local_dependencies.or_else(|| {
let pip_local_dependencies = std::env::var("PIP_LOCAL_DEPENDENCIES")
.ok()
.map(|x| x.split(',').map(|x| x.to_string()).collect());
if pip_local_dependencies == Some(vec!["".to_string()]) {
None
} else {
pip_local_dependencies
}
}),
additional_python_paths: config.additional_python_paths.or_else(|| {
std::env::var("ADDITIONAL_PYTHON_PATHS")
.ok()
.map(|x| x.split(':').map(|x| x.to_string()).collect())
}),
})
}

Expand All @@ -203,20 +227,36 @@ pub struct WorkspacedPath {
pub workspace_id: String,
pub path: String,
}

#[derive(Serialize, Deserialize)]
pub struct WorkerConfigOpt {
pub worker_tags: Option<Vec<String>>,
pub dedicated_worker: Option<String>,
pub init_bash: Option<String>,
pub cache_clear: Option<u32>,
pub additional_python_paths: Option<Vec<String>>,
pub pip_local_dependencies: Option<Vec<String>>,
}

impl Default for WorkerConfigOpt {
fn default() -> Self {
Self { worker_tags: Default::default(), dedicated_worker: Default::default() }
Self {
worker_tags: Default::default(),
dedicated_worker: Default::default(),
init_bash: Default::default(),
cache_clear: Default::default(),
additional_python_paths: Default::default(),
pip_local_dependencies: Default::default(),
}
}
}

#[derive(PartialEq, Debug, Clone)]
pub struct WorkerConfig {
pub worker_tags: Vec<String>,
pub dedicated_worker: Option<WorkspacedPath>,
pub init_bash: Option<String>,
pub cache_clear: Option<u32>,
pub additional_python_paths: Option<Vec<String>>,
pub pip_local_dependencies: Option<Vec<String>>,
}
10 changes: 9 additions & 1 deletion backend/windmill-worker/src/common.rs
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,8 @@ use futures::{
};

use crate::{
AuthedClient, MAX_RESULT_SIZE, MAX_WAIT_FOR_SIGTERM, TIMEOUT_DURATION, WHITELIST_ENVS,
AuthedClient, MAX_RESULT_SIZE, MAX_WAIT_FOR_SIGTERM, ROOT_CACHE_DIR, TIMEOUT_DURATION,
WHITELIST_ENVS,
};

#[tracing::instrument(level = "trace", skip_all)]
Expand Down Expand Up @@ -679,3 +680,10 @@ async fn append_logs(job_id: uuid::Uuid, logs: impl AsRef<str>, db: impl Borrow<
tracing::error!(%job_id, %err, "error updating logs for job {job_id}: {err}");
}
}

pub async fn clean_cache() -> error::Result<()> {
tracing::info!("Started cleaning cache");
tokio::fs::remove_dir_all(ROOT_CACHE_DIR).await?;
tracing::info!("Finished cleaning cache");
Ok(())
}
2 changes: 1 addition & 1 deletion backend/windmill-worker/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ mod snowflake_executor;

mod bash_executor;
mod bun_executor;
mod common;
pub mod common;
mod config;
mod dedicated_worker;
mod deno_executor;
Expand Down
27 changes: 11 additions & 16 deletions backend/windmill-worker/src/python_executor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ use windmill_common::{
error::{self, Error},
jobs::QueuedJob,
utils::calculate_hash,
worker::WORKER_CONFIG,
};

lazy_static::lazy_static! {
Expand All @@ -26,20 +27,7 @@ lazy_static::lazy_static! {

static ref PIP_INDEX_URL: Option<String> = std::env::var("PIP_INDEX_URL").ok();
static ref PIP_TRUSTED_HOST: Option<String> = std::env::var("PIP_TRUSTED_HOST").ok();
static ref PIP_LOCAL_DEPENDENCIES: Option<Vec<String>> = {
let pip_local_dependencies = std::env::var("PIP_LOCAL_DEPENDENCIES")
.ok()
.map(|x| x.split(',').map(|x| x.to_string()).collect());
if pip_local_dependencies == Some(vec!["".to_string()]) {
None
} else {
pip_local_dependencies
}
};

static ref ADDITIONAL_PYTHON_PATHS: Option<Vec<String>> = std::env::var("ADDITIONAL_PYTHON_PATHS")
.ok()
.map(|x| x.split(':').map(|x| x.to_string()).collect());

static ref RELATIVE_IMPORT_REGEX: Regex = Regex::new(r#"(import|from)\s(((u|f)\.)|\.)"#).unwrap();

Expand Down Expand Up @@ -84,7 +72,9 @@ pub async fn pip_compile(
logs.push_str(&format!("\nresolving dependencies..."));
set_logs(logs, job_id, db).await;
logs.push_str(&format!("\ncontent of requirements:\n{}\n", requirements));
let requirements = if let Some(pip_local_dependencies) = PIP_LOCAL_DEPENDENCIES.as_ref() {
let requirements = if let Some(pip_local_dependencies) =
WORKER_CONFIG.read().await.pip_local_dependencies.as_ref()
{
let deps = pip_local_dependencies.clone();
requirements
.lines()
Expand Down Expand Up @@ -182,8 +172,13 @@ pub async fn handle_python_job(
) -> windmill_common::error::Result<serde_json::Value> {
create_dependencies_dir(job_dir).await;

let mut additional_python_paths: Vec<String> =
ADDITIONAL_PYTHON_PATHS.to_owned().unwrap_or_else(|| vec![]);
let mut additional_python_paths: Vec<String> = WORKER_CONFIG
.read()
.await
.additional_python_paths
.clone()
.unwrap_or_else(|| vec![])
.clone();

let requirements = match requirements_o {
Some(r) => r,
Expand Down
Loading
Loading