Skip to content

Commit

Permalink
Use a unique lock key per table with the deltalake library
Browse files Browse the repository at this point in the history
Oxbow will maintain its own outer locking mechanism, but for the locking on the
underlying deltatable, this ensures that each table being accessed has its own
key being used.

Fixes #9
  • Loading branch information
rtyler committed Nov 13, 2023
1 parent b5c32d4 commit 2c5c378
Show file tree
Hide file tree
Showing 2 changed files with 14 additions and 2 deletions.
2 changes: 1 addition & 1 deletion Cargo.toml
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
[package]
name = "oxbow"
version = "0.6.1"
version = "0.6.2"
edition = "2021"
keywords = ["deltalake", "parquet", "lambda", "delta"]
homepage = "https://github.com/buoyant-data/oxbow"
Expand Down
14 changes: 13 additions & 1 deletion src/bin/lambda.rs
Original file line number Diff line number Diff line change
Expand Up @@ -77,8 +77,20 @@ async fn func<'a>(event: LambdaEvent<SqsEvent>) -> Result<Value, Error> {
.expect("Failed to get the files for a table, impossible!");
// messages is just for sending responses out of the lambda
let mut messages = vec![];
let mut storage_options: HashMap<String, String> = HashMap::default();
// Ensure that the DeltaTable we get back uses the table-name as a partition key
// when locking in DynamoDb: <https://github.com/buoyant-data/oxbow/issues/9>
//
// Without this setting each Lambda invocation will use the same default key `delta-rs`
// when locking in DynamoDb.
storage_options.insert(
"DYNAMO_LOCK_PARTITION_KEY_VALUE".into(),
format!("{table_name}:delta").into(),
);

if let Ok(mut table) = deltalake::open_table(&table_name).await {
if let Ok(mut table) =
deltalake::open_table_with_storage_options(&table_name, storage_options).await
{
info!("Opened table to append: {:?}", table);
let lock = acquire_lock(table_name, &lock_client).await;

Expand Down

0 comments on commit 2c5c378

Please sign in to comment.