Skip to content

Commit

Permalink
[Conductor] Share UDF cache across instances to limit memory usage (#…
Browse files Browse the repository at this point in the history
…32437)

GitOrigin-RevId: 6ae52f5f654c4ec792e046a5cca252730940bf3b
  • Loading branch information
goffrie authored and Convex, Inc. committed Dec 19, 2024
1 parent 2a26e56 commit 5400577
Show file tree
Hide file tree
Showing 6 changed files with 76 additions and 44 deletions.
7 changes: 6 additions & 1 deletion crates/application/src/application_function_runner/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -206,7 +206,10 @@ use crate::{
log_function_wait_timeout,
log_mutation_already_committed,
},
cache::CacheManager,
cache::{
CacheManager,
QueryCache,
},
function_log::{
ActionCompletion,
FunctionExecutionLog,
Expand Down Expand Up @@ -589,6 +592,7 @@ impl<RT: Runtime> ApplicationFunctionRunner<RT> {
module_cache: Arc<dyn ModuleLoader<RT>>,
function_log: FunctionExecutionLog<RT>,
system_env_vars: BTreeMap<EnvVarName, EnvVarValue>,
cache: QueryCache,
) -> Self {
// We limit the isolates to only consume fraction of the available
// cores leaving the rest for tokio. This is still over-provisioning
Expand All @@ -613,6 +617,7 @@ impl<RT: Runtime> ApplicationFunctionRunner<RT> {
database.clone(),
isolate_functions.clone(),
function_log.clone(),
cache,
);

Self {
Expand Down
92 changes: 51 additions & 41 deletions crates/application/src/cache/mod.rs
Original file line number Diff line number Diff line change
@@ -1,9 +1,12 @@
use std::{
cmp,
collections::BTreeMap,
fmt,
mem,
sync::{
atomic::{
AtomicU32,
Ordering,
},
Arc,
LazyLock,
},
Expand All @@ -22,7 +25,6 @@ use common::{
knobs::{
DATABASE_UDF_SYSTEM_TIMEOUT,
DATABASE_UDF_USER_TIMEOUT,
UDF_CACHE_MAX_SIZE,
},
query_journal::QueryJournal,
runtime::Runtime,
Expand Down Expand Up @@ -100,31 +102,31 @@ pub struct CacheManager<RT: Runtime> {
function_router: FunctionRouter<RT>,
udf_execution: FunctionExecutionLog<RT>,

cache: Cache,
instance_id: InstanceId,
cache: QueryCache,
}

#[derive(Clone, Eq, PartialEq, Hash)]
#[derive(Copy, Clone, Eq, PartialEq, Hash, Debug)]
struct InstanceId(u32);
impl InstanceId {
fn allocate() -> Self {
static NEXT_INSTANCE_ID: AtomicU32 = AtomicU32::new(0);
let id = NEXT_INSTANCE_ID.fetch_add(1, Ordering::SeqCst);
assert_ne!(id, u32::MAX, "instance id overflow");
InstanceId(id)
}
}

#[derive(Clone, Eq, PartialEq, Hash, Debug)]
pub struct CacheKey {
instance: InstanceId,
path: PublicFunctionPath,
args: ConvexArray,
identity: IdentityCacheKey,
journal: QueryJournal,
allowed_visibility: AllowedVisibility,
}

impl fmt::Debug for CacheKey {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
let mut builder = f.debug_struct("CacheKey");
builder
.field("path", &self.path)
.field("args", &self.args)
.field("identity", &self.identity)
.field("journal", &self.journal)
.field("allowed_visibility", &self.allowed_visibility)
.finish()
}
}

impl CacheKey {
/// Approximate size in-memory of the CacheEntry structure, including stack
/// and heap allocated memory.
Expand Down Expand Up @@ -182,13 +184,18 @@ impl<RT: Runtime> CacheManager<RT> {
database: Database<RT>,
function_router: FunctionRouter<RT>,
udf_execution: FunctionExecutionLog<RT>,
cache: QueryCache,
) -> Self {
// each `CacheManager` (for a different instance) gets its own cache key space
// within `Cache`, which has a _global_ size-limit
let instance_id = InstanceId::allocate();
Self {
rt,
database,
function_router,
udf_execution,
cache: Cache::new(),
instance_id,
cache,
}
}

Expand Down Expand Up @@ -247,6 +254,7 @@ impl<RT: Runtime> CacheManager<RT> {
let start = self.rt.monotonic_now();
let identity_cache_key = identity.cache_key();
let key = CacheKey {
instance: self.instance_id,
path: path.clone(),
args: args.clone(),
identity: identity_cache_key,
Expand Down Expand Up @@ -357,7 +365,7 @@ impl<RT: Runtime> CacheManager<RT> {
async fn perform_cache_op(
&self,
key: &CacheKey,
op: CacheOp,
op: CacheOp<'_>,
usage_tracker: FunctionUsageTracker,
) -> anyhow::Result<Option<(CacheResult, BTreeMap<TableName, TableStats>)>> {
let r = match op {
Expand Down Expand Up @@ -436,8 +444,8 @@ impl<RT: Runtime> CacheManager<RT> {
let query_outcome = UdfOutcome::from_error(
js_err,
path.clone().debug_into_component_path(),
args,
identity.into(),
args.clone(),
identity.clone().into(),
self.rt.clone(),
None,
)?;
Expand All @@ -451,7 +459,7 @@ impl<RT: Runtime> CacheManager<RT> {
tx,
path_and_args,
UdfType::Query,
journal,
journal.clone(),
context,
)
.await?;
Expand Down Expand Up @@ -555,11 +563,11 @@ impl<RT: Runtime> CacheManager<RT> {
struct WaitingEntryGuard<'a> {
entry_id: Option<u64>,
key: &'a CacheKey,
cache: Cache,
cache: QueryCache,
}

impl<'a> WaitingEntryGuard<'a> {
fn new(entry_id: Option<u64>, key: &'a CacheKey, cache: Cache) -> Self {
fn new(entry_id: Option<u64>, key: &'a CacheKey, cache: QueryCache) -> Self {
Self {
entry_id,
key,
Expand Down Expand Up @@ -588,36 +596,38 @@ impl<'a> Drop for WaitingEntryGuard<'a> {
struct Inner {
cache: LruCache<CacheKey, CacheEntry>,
size: usize,
size_limit: usize,

next_waiting_id: u64,
}

#[derive(Clone)]
struct Cache {
pub struct QueryCache {
inner: Arc<Mutex<Inner>>,
}

impl Cache {
fn new() -> Self {
impl QueryCache {
pub fn new(size_limit: usize) -> Self {
let inner = Inner {
cache: LruCache::unbounded(),
size: 0,
next_waiting_id: 0,
size_limit,
};
Self {
inner: Arc::new(Mutex::new(inner)),
}
}

fn plan_cache_op(
fn plan_cache_op<'a>(
&self,
key: &CacheKey,
key: &'a CacheKey,
start: tokio::time::Instant,
now: tokio::time::Instant,
identity: &Identity,
identity: &'a Identity,
ts: Timestamp,
context: ExecutionContext,
) -> Option<CacheOp> {
) -> Option<CacheOp<'a>> {
let go = |sender: Option<(Sender<_>, u64)>| {
let (sender, waiting_entry_id) = match sender {
Some((sender, waiting_entry_id)) => (sender, Some(waiting_entry_id)),
Expand All @@ -631,11 +641,11 @@ impl Cache {
CacheOp::Go {
waiting_entry_id,
sender,
path: key.path.clone(),
args: key.args.clone(),
identity: identity.clone(),
path: &key.path,
args: &key.args,
identity,
ts,
journal: key.journal.clone(),
journal: &key.journal,
allowed_visibility: key.allowed_visibility.clone(),
context,
}
Expand Down Expand Up @@ -811,7 +821,7 @@ impl Inner {

/// Pop records until the cache is under the given size.
fn enforce_size_limit(&mut self) {
while self.size > *UDF_CACHE_MAX_SIZE {
while self.size > self.size_limit {
let (popped_key, popped_entry) = self
.cache
.pop_lru()
Expand All @@ -822,7 +832,7 @@ impl Inner {
}
}

enum CacheOp {
enum CacheOp<'a> {
Ready {
result: CacheResult,
},
Expand All @@ -834,11 +844,11 @@ enum CacheOp {
Go {
waiting_entry_id: Option<u64>,
sender: Sender<CacheResult>,
path: PublicFunctionPath,
args: ConvexArray,
identity: Identity,
path: &'a PublicFunctionPath,
args: &'a ConvexArray,
identity: &'a Identity,
ts: Timestamp,
journal: QueryJournal,
journal: &'a QueryJournal,
allowed_visibility: AllowedVisibility,
context: ExecutionContext,
},
Expand Down
3 changes: 3 additions & 0 deletions crates/application/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -350,6 +350,7 @@ pub mod test_helpers;
#[cfg(test)]
mod tests;

pub use crate::cache::QueryCache;
use crate::metrics::{
log_external_deps_package,
log_source_package_size_bytes_total,
Expand Down Expand Up @@ -557,6 +558,7 @@ impl<RT: Runtime> Application<RT> {
snapshot_import_pause_client: PauseClient,
scheduled_jobs_pause_client: PauseClient,
app_auth: Arc<ApplicationAuth>,
cache: QueryCache,
) -> anyhow::Result<Self> {
let module_cache = ModuleCache::new(runtime.clone(), modules_storage.clone()).await;
let module_loader = Arc::new(module_cache.clone());
Expand Down Expand Up @@ -622,6 +624,7 @@ impl<RT: Runtime> Application<RT> {
module_loader,
function_log.clone(),
system_env_vars.clone(),
cache,
));
function_runner.set_action_callbacks(runner.clone());

Expand Down
7 changes: 6 additions & 1 deletion crates/application/src/test_helpers.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,10 @@ use common::{
components::ComponentId,
db_schema,
http::fetch::StaticFetchClient,
knobs::ACTION_USER_TIMEOUT,
knobs::{
ACTION_USER_TIMEOUT,
UDF_CACHE_MAX_SIZE,
},
log_streaming::NoopLogSender,
pause::{
PauseClient,
Expand Down Expand Up @@ -102,6 +105,7 @@ use value::{
};

use crate::{
cache::QueryCache,
cron_jobs::CronJobExecutor,
deploy_config::{
SchemaStatus,
Expand Down Expand Up @@ -305,6 +309,7 @@ impl<RT: Runtime> ApplicationTestExt<RT> for Application<RT> {
kb.clone(),
Arc::new(NullAccessTokenAuth),
)),
QueryCache::new(*UDF_CACHE_MAX_SIZE),
)
.await?;

Expand Down
4 changes: 4 additions & 0 deletions crates/common/src/knobs.rs
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,10 @@ pub static RUNTIME_DISABLE_LIFO_SLOT: LazyLock<bool> =
pub static UDF_CACHE_MAX_SIZE: LazyLock<usize> =
LazyLock::new(|| env_config("UDF_CACHE_MAX_SIZE", 104857600));

/// Maximum size of the shared UDF cache in Conductor. Default 500MiB.
pub static SHARED_UDF_CACHE_MAX_SIZE: LazyLock<usize> =
LazyLock::new(|| env_config("SHARED_UDF_CACHE_MAX_SIZE", 5 * 104857600));

/// How many UDF execution logs to keep in memory.
pub static MAX_UDF_EXECUTION: LazyLock<usize> =
LazyLock::new(|| env_config("MAX_UDF_EXECUTION", 1000));
Expand Down
7 changes: 6 additions & 1 deletion crates/local_backend/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -24,13 +24,17 @@ use application::{
api::ApplicationApi,
log_visibility::AllowLogging,
Application,
QueryCache,
};
use common::{
http::{
fetch::ProxiedFetchClient,
RouteMapper,
},
knobs::ACTION_USER_TIMEOUT,
knobs::{
ACTION_USER_TIMEOUT,
UDF_CACHE_MAX_SIZE,
},
log_streaming::NoopLogSender,
pause::PauseClient,
persistence::Persistence,
Expand Down Expand Up @@ -253,6 +257,7 @@ pub async fn make_app(
key_broker.clone(),
Arc::new(NullAccessTokenAuth),
)),
QueryCache::new(*UDF_CACHE_MAX_SIZE),
)
.await?;

Expand Down

0 comments on commit 5400577

Please sign in to comment.