Skip to content

Commit

Permalink
Avoid adding redundant files to existing tables when triggered
Browse files Browse the repository at this point in the history
There are some scenarios, particularly with Lambda invokations where Oxbow may
be called repeatedly with the same file. This change modifies append_to_table()
to ensure that it will not add files which already exist in the loaded Delta
Table.

Fixes #3
  • Loading branch information
rtyler committed Oct 21, 2023
1 parent 37e87ea commit ceb38f8
Showing 1 changed file with 121 additions and 38 deletions.
159 changes: 121 additions & 38 deletions src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@ use url::Url;
use std::collections::{HashMap, HashSet};
use std::sync::Arc;

/*
/**
* convert is the main function to be called by the CLI or other "one shot" executors which just
* need to take a given location and convert it all at once
*/
Expand Down Expand Up @@ -51,15 +51,15 @@ pub async fn convert(location: &str) -> DeltaResult<DeltaTable> {
}
}

/*
/**
* Create the ObjectStore for the given location
*/
pub fn object_store_for(location: &Url) -> Arc<DeltaObjectStore> {
let options = HashMap::new();
Arc::new(DeltaObjectStore::try_new(location.clone(), options).expect("Failed to make store"))
}

/*
/**
* Discover `.parquet` files which are present in the location
*/
pub async fn discover_parquet_files(
Expand Down Expand Up @@ -89,7 +89,7 @@ pub async fn discover_parquet_files(
Ok(result)
}

/*
/**
* Create a Delta table with the given series of files at the specified location
*/
pub async fn create_table_with(
Expand Down Expand Up @@ -141,11 +141,18 @@ pub async fn create_table_with(
.await
}

/*
/**
* Append the given files to an already existing and initialized Delta Table
*/
pub async fn append_to_table(files: &[ObjectMeta], table: &mut DeltaTable) -> DeltaResult<i64> {
let actions = add_actions_for(files);
let existing_files = table.get_files();
let new_files: Vec<ObjectMeta> = files
.iter()
.filter(|f| !existing_files.contains(&f.location))
.cloned()
.collect();

let actions = add_actions_for(&new_files);

deltalake::operations::transaction::commit(
table.object_store().as_ref(),
Expand All @@ -161,7 +168,7 @@ pub async fn append_to_table(files: &[ObjectMeta], table: &mut DeltaTable) -> De
.await
}

/*
/**
* Take an iterator of files and determine what looks like a partition column from it
*/
fn partition_columns_from(files: &[ObjectMeta]) -> Vec<String> {
Expand All @@ -184,7 +191,7 @@ fn partition_columns_from(files: &[ObjectMeta]) -> Vec<String> {
results.into_iter().collect()
}

/*
/**
* Return all the partitions from the given path buf
*/
fn partitions_from(path_str: &str) -> Vec<DeltaTablePartition> {
Expand All @@ -194,7 +201,7 @@ fn partitions_from(path_str: &str) -> Vec<DeltaTablePartition> {
.collect()
}

/*
/**
* Provide a series of Add actions for the given ObjectMeta entries
*
* This is a critical translation layer between discovered parquet files and how those would be
Expand Down Expand Up @@ -222,7 +229,7 @@ pub fn add_actions_for(files: &[ObjectMeta]) -> Vec<Action> {
.collect()
}

/*
/**
* Return the smallest file from the given set of files.
*
* This can be useful to find the smallest possible parquet file to load from the set in order to
Expand All @@ -246,11 +253,73 @@ fn find_smallest_file(files: &Vec<ObjectMeta>) -> Option<&ObjectMeta> {
#[cfg(test)]
mod tests {
use super::*;
use std::hash::Hash;

use chrono::prelude::Utc;
use deltalake::Path;

/*
* test utilities to share between test cases
*/
mod util {
use std::collections::HashSet;
use std::hash::Hash;
use std::sync::Arc;

use url::Url;

use super::*;

pub(crate) fn assert_unordered_eq<T>(left: &[T], right: &[T])
where
T: Eq + Hash + std::fmt::Debug,
{
let left: HashSet<_> = left.iter().collect();
let right: HashSet<_> = right.iter().collect();

assert_eq!(left, right);
}

#[allow(dead_code)]
/**
* Helper function to use when debugging to list temp and other directories recursively
*/
fn list_directory(path: &std::path::Path) {
if path.is_dir() {
for entry in std::fs::read_dir(path).unwrap() {
let entry = entry.unwrap();
println!("{}", entry.path().display());
list_directory(&entry.path());
}
}
}

/**
* Create a temporary directory filled with parquet files but no _delta_log
* for testing.
*
* The [tempfile::TempDir] must be passed to the caller to ensure the destructor doesn't
* delete the directory before it is used further in the test.
*/
pub(crate) fn create_temp_path_with(
fixture_table_path: &str,
) -> (tempfile::TempDir, Arc<DeltaObjectStore>) {
use fs_extra::{copy_items, dir::CopyOptions, remove_items};

let path = std::fs::canonicalize(fixture_table_path).expect("Failed to canonicalize");
let dir = tempfile::tempdir().expect("Failed to create a temporary directory");

let options = CopyOptions::new();
let _ = copy_items(&vec![path.as_path()], dir.path(), &options)
.expect("Failed to copy items over");
// Remove the tempdir's copied _delta_log/ since the test must recreate it
remove_items(&vec![dir.path().join("_delta_log")])
.expect("Failed to remove temp _delta_log/");

let url = Url::from_file_path(dir.path()).expect("Failed to parse local path");
(dir, object_store_for(&url))
}
}

#[tokio::test]
async fn discover_parquet_files_empty_dir() {
let dir = tempfile::tempdir().expect("Failed to create a temporary directory");
Expand Down Expand Up @@ -376,16 +445,6 @@ mod tests {
assert_eq!(expected, partition_columns_from(&files));
}

fn assert_unordered_eq<T>(left: &[T], right: &[T])
where
T: Eq + Hash + std::fmt::Debug,
{
let left: HashSet<_> = left.iter().collect();
let right: HashSet<_> = right.iter().collect();

assert_eq!(left, right);
}

#[test]
fn partition_columns_from_multiple_partition() {
let expected: Vec<String> = vec!["c2".into(), "c3".into()];
Expand All @@ -404,33 +463,20 @@ mod tests {
size: 1024,
},
];
assert_unordered_eq(&expected, &partition_columns_from(&files));
util::assert_unordered_eq(&expected, &partition_columns_from(&files));
}

/*
* See <https://github.com/buoyant-data/oxbow/issues/2>
*/
#[tokio::test]
async fn create_schema_for_partitioned_path() {
use fs_extra::{copy_items, dir::CopyOptions, remove_items};

let path = std::fs::canonicalize("./tests/data/hive/deltatbl-partitioned")
.expect("Failed to canonicalize");
let dir = tempfile::tempdir().expect("Failed to create a temporary directory");

let options = CopyOptions::new();
copy_items(&vec![path.as_path()], dir.path(), &options).expect("Failed to copy items over");
// Remove the tempdir's copied _delta_log/ since the test must recreate it
remove_items(&vec![dir.path().join("_delta_log")])
.expect("Failed to remove temp _delta_log/");

let url = Url::from_file_path(dir.path()).expect("Failed to parse local path");
let store = object_store_for(&url);

let (_tempdir, store) =
util::create_temp_path_with("./tests/data/hive/deltatbl-partitioned");
let files = discover_parquet_files(store.clone())
.await
.expect("Failed to discover parquet files");
assert_eq!(files.len(), 4);
assert_eq!(files.len(), 4, "No files discovered");

let parts = partition_columns_from(&files);
assert_eq!(
Expand All @@ -448,4 +494,41 @@ mod tests {
"The schema does not include the expected partition key `c2`"
);
}

/*
* Ensure that the append_to_table() function does not add redundant files already added to the
* Delta Table
*
* <https://github.com/buoyant-data/oxbow/issues/3>
*/
#[tokio::test]
async fn test_avoiding_adding_duplicate_files() {
let (_tempdir, store) =
util::create_temp_path_with("./tests/data/hive/deltatbl-partitioned");

let files = discover_parquet_files(store.clone())
.await
.expect("Failed to discover parquet files");
assert_eq!(files.len(), 4, "No files discovered");

let mut table = create_table_with(&files, store.clone())
.await
.expect("Failed to create table");
let schema = table.get_schema().expect("Failed to get schema");
assert!(
schema.get_field_with_name("c2").is_ok(),
"The schema does not include the expected partition key `c2`"
);
assert_eq!(
table.get_files().len(),
4,
"Did not find the right number of tables"
);

append_to_table(&files, &mut table)
.await
.expect("Failed to append files");
table.load().await.expect("Failed to reload the table");
assert_eq!(table.get_files().len(), 4, "Found redundant files!");
}
}

0 comments on commit ceb38f8

Please sign in to comment.