Skip to content

Commit

Permalink
get python tests passing
Browse files Browse the repository at this point in the history
  • Loading branch information
wjones127 committed Nov 12, 2024
1 parent 7569fe2 commit 713ad49
Show file tree
Hide file tree
Showing 4 changed files with 42 additions and 5 deletions.
19 changes: 17 additions & 2 deletions python/src/fragment.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,11 +21,14 @@ use arrow_array::RecordBatchReader;
use arrow_schema::Schema as ArrowSchema;
use futures::TryFutureExt;
use lance::dataset::fragment::FileFragment as LanceFragment;
use lance::dataset::transaction::Operation;
use lance::dataset::NewColumnTransform;
use lance::Error;
use lance_table::format::{DataFile as LanceDataFile, Fragment as LanceFragmentMetadata};
use lance_table::io::deletion::deletion_file_path;
use pyo3::prelude::*;
use pyo3::{exceptions::*, pyclass::CompareOp, types::PyDict};
use snafu::{location, Location};

use crate::dataset::{get_write_params, transforms_from_python};
use crate::error::PythonErrorExt;
Expand Down Expand Up @@ -474,10 +477,22 @@ pub fn write_fragments(
.map_err(|err| PyIOError::new_err(err.to_string()))?;

assert!(
written.blob.is_none(),
written.blobs_op.is_none(),
"Blob writing is not yet supported by the python _write_fragments API"
);
let fragments = written.default.0;

let get_fragments = |operation| match operation {
Operation::Overwrite { fragments, .. } => Ok(fragments),
Operation::Append { fragments, .. } => Ok(fragments),
_ => {
return Err(Error::Internal {
message: "Unexpected operation".into(),
location: location!(),
})
}
};
let fragments =
get_fragments(written.operation).map_err(|err| PyRuntimeError::new_err(err.to_string()))?;

fragments
.into_iter()
Expand Down
5 changes: 3 additions & 2 deletions rust/lance/src/dataset.rs
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,6 @@ use std::ops::Range;
use std::pin::Pin;
use std::sync::Arc;
use tracing::instrument;
use write::{CommitBuilder, InsertBuilder, InsertDestination};

mod blob;
pub mod builder;
Expand Down Expand Up @@ -85,7 +84,9 @@ pub use write::merge_insert::{
MergeInsertBuilder, MergeInsertJob, WhenMatched, WhenNotMatched, WhenNotMatchedBySource,
};
pub use write::update::{UpdateBuilder, UpdateJob};
pub use write::{write_fragments, WriteMode, WriteParams};
pub use write::{
write_fragments, CommitBuilder, InsertBuilder, InsertDestination, WriteMode, WriteParams,
};

const INDICES_DIR: &str = "_indices";

Expand Down
14 changes: 13 additions & 1 deletion rust/lance/src/dataset/write/commit.rs
Original file line number Diff line number Diff line change
Expand Up @@ -152,13 +152,19 @@ impl<'a> CommitBuilder<'a> {
InsertDestination::Dataset(dataset) => InsertDestination::Dataset(dataset.clone()),
InsertDestination::Uri(uri) => {
// Check if it already exists.
let builder = DatasetBuilder::from_uri(uri).with_read_params(ReadParams {
let mut builder = DatasetBuilder::from_uri(uri).with_read_params(ReadParams {
store_options: self.store_params.clone(),
commit_handler: self.commit_handler.clone(),
object_store_registry: self.object_store_registry.clone(),
..Default::default()
});

// If read_version is zero, then it might not have originally been
// passed. We can assume the latest version.
if transaction.read_version > 0 {
builder = builder.with_version(transaction.read_version)
}

match builder.load().await {
Ok(dataset) => InsertDestination::Dataset(Arc::new(dataset)),
Err(Error::DatasetNotFound { .. } | Error::NotFound { .. }) => {
Expand Down Expand Up @@ -194,6 +200,12 @@ impl<'a> CommitBuilder<'a> {

let manifest = if let Some(dataset) = dest.dataset() {
if self.detached {
if matches!(manifest_naming_scheme, ManifestNamingScheme::V1) {
return Err(Error::NotSupported {
source: "detached commits cannot be used with v1 manifest paths".into(),
location: location!(),
});
}
commit_detached_transaction(
dataset,
object_store.as_ref(),
Expand Down
9 changes: 9 additions & 0 deletions rust/lance/src/dataset/write/insert.rs
Original file line number Diff line number Diff line change
Expand Up @@ -254,6 +254,15 @@ impl<'a> InsertBuilder<'a> {
}
}

// If we are writing a dataset with non-default storage, we need to enable move stable row ids
if context.dest.dataset().is_none()
&& !context.params.enable_move_stable_row_ids
&& data_schema.fields.iter().any(|f| !f.is_default_storage())
{
log::info!("Enabling move stable row ids because non-default storage is used");
context.params.enable_move_stable_row_ids = true;
}

// Feature flags
if let InsertDestination::Dataset(dataset) = &context.dest {
if !can_write_dataset(dataset.manifest.writer_feature_flags) {
Expand Down

0 comments on commit 713ad49

Please sign in to comment.