diff --git a/rust/lance/src/dataset.rs b/rust/lance/src/dataset.rs index e593228c03..3da1d16d76 100644 --- a/rust/lance/src/dataset.rs +++ b/rust/lance/src/dataset.rs @@ -4123,4 +4123,40 @@ mod tests { dataset.latest_version_id().await.unwrap() ); } + + #[tokio::test] + async fn concurrent_create() { + async fn write(uri: &str) -> Result<()> { + let schema = Arc::new(ArrowSchema::new(vec![ArrowField::new( + "a", + DataType::Int32, + false, + )])); + let empty_reader = RecordBatchIterator::new(vec![], schema.clone()); + Dataset::write(empty_reader, uri, None).await?; + Ok(()) + } + + for _ in 0..5 { + let test_dir = tempdir().unwrap(); + let test_uri = test_dir.path().to_str().unwrap(); + + let (res1, res2) = tokio::join!(write(test_uri), write(test_uri)); + + assert!(res1.is_ok() || res2.is_ok()); + if res1.is_err() { + assert!( + matches!(res1, Err(Error::DatasetAlreadyExists { .. })), + "{:?}", + res1 + ); + } else { + assert!( + matches!(res2, Err(Error::DatasetAlreadyExists { .. })), + "{:?}", + res2 + ); + } + } + } } diff --git a/rust/lance/src/io/commit.rs b/rust/lance/src/io/commit.rs index be2c75723a..cb03eceeff 100644 --- a/rust/lance/src/io/commit.rs +++ b/rust/lance/src/io/commit.rs @@ -128,7 +128,7 @@ pub(crate) async fn commit_new_dataset( let (mut manifest, indices) = transaction.build_manifest(None, vec![], &transaction_file, write_config)?; - write_manifest_file( + let result = write_manifest_file( object_store, commit_handler, base_path, @@ -141,9 +141,18 @@ pub(crate) async fn commit_new_dataset( write_config, manifest_naming_scheme, ) - .await?; - - Ok(manifest) + .await; + + // TODO: Allow Append or Overwrite mode to retry using `commit_transaction` + // if there is a conflict. + match result { + Ok(()) => Ok(manifest), + Err(CommitError::CommitConflict) => Err(crate::Error::DatasetAlreadyExists { + uri: base_path.to_string(), + location: location!(), + }), + Err(CommitError::OtherError(err)) => Err(err), + } } /// Internal function to check if a manifest could use some migration.