From 2c5c3782c808c5450c7c5fe7ad73f2aff9169c33 Mon Sep 17 00:00:00 2001 From: "R. Tyler Croy" Date: Sun, 12 Nov 2023 20:19:13 -0800 Subject: [PATCH] Use a unique lock key per table with the deltalake library Oxbow will maintain its own outer locking mechanism, but for the locking on the underlying deltatable, this ensures that each table being accessed has its own key being used. Fixes #9 --- Cargo.toml | 2 +- src/bin/lambda.rs | 14 +++++++++++++- 2 files changed, 14 insertions(+), 2 deletions(-) diff --git a/Cargo.toml b/Cargo.toml index d5c05e9..5a85613 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -1,6 +1,6 @@ [package] name = "oxbow" -version = "0.6.1" +version = "0.6.2" edition = "2021" keywords = ["deltalake", "parquet", "lambda", "delta"] homepage = "https://github.com/buoyant-data/oxbow" diff --git a/src/bin/lambda.rs b/src/bin/lambda.rs index f837dbd..89fd350 100644 --- a/src/bin/lambda.rs +++ b/src/bin/lambda.rs @@ -77,8 +77,20 @@ async fn func<'a>(event: LambdaEvent) -> Result { .expect("Failed to get the files for a table, impossible!"); // messages is just for sending responses out of the lambda let mut messages = vec![]; + let mut storage_options: HashMap = HashMap::default(); + // Ensure that the DeltaTable we get back uses the table-name as a partition key + // when locking in DynamoDb: + // + // Without this setting each Lambda invocation will use the same default key `delta-rs` + // when locking in DynamoDb. + storage_options.insert( + "DYNAMO_LOCK_PARTITION_KEY_VALUE".into(), + format!("{table_name}:delta").into(), + ); - if let Ok(mut table) = deltalake::open_table(&table_name).await { + if let Ok(mut table) = + deltalake::open_table_with_storage_options(&table_name, storage_options).await + { info!("Opened table to append: {:?}", table); let lock = acquire_lock(table_name, &lock_client).await;