Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Simplified TableProvider::Insert API #6339

Closed
alamb opened this issue May 11, 2023 · 19 comments
Closed

Simplified TableProvider::Insert API #6339

alamb opened this issue May 11, 2023 · 19 comments
Labels
enhancement New feature or request

Comments

@alamb
Copy link
Contributor

alamb commented May 11, 2023

Is your feature request related to a problem or challenge?

Recent INSERT work #6049 is a good example of a useful datafusion feature that has an extensibility story (a new function on a trait)

However, it takes a non trivial effort to add such support (requires an new physical operator).

Describe the solution you'd like

Thus I would like to propose the following API to support writing to sources

DataSink trait

A new trait that exposes just the information needed writing. Something like:

/// The DataSink implements writing streams of [`RecordBatch`]es to
/// partitioned destinations
pub trait DataSink: std::fmt::Debug + std::fmt::Display + Send + Sync {

    /// How does this sink want its input distributed?
    fn required_input_distribution(&self) -> Distribution;

    /// return a future which writes a RecordBatchStream to a particular partition
    /// and return the number of rows written
    fn write_stream(&self, partition: usize, input: SendableRecordBatchStream) -> BoxFuture<Result<u64>>;
}

Change signature of TableProvider

Then if we change the signature of TableProvider from

    /// Insert into this table
    async fn insert_into(
        &self,
        _state: &SessionState,
        _input: Arc<dyn ExecutionPlan>,
    ) -> Result<Arc<dyn ExecutionPlan>> {
        let msg = "Insertion not implemented for this table".to_owned();
        Err(DataFusionError::NotImplemented(msg))
    }

To something like

    /// Get a sink to use to write to this table, if supported
    async fn sink(
        &self,
    ) -> Result<Arc<dyn DataSink>> {
        let msg = "Insertion not implemented for this table".to_owned();
        Err(DataFusionError::NotImplemented(msg))
    }

I think almost all of the inert plans can share a common ExecutionPlan

Describe alternatives you've considered

do nothing

Additional context

No response

@JanKaul
Copy link
Contributor

JanKaul commented May 12, 2023

This sounds like a great approach. Thank you for bringing it up.

I'm trying to implement the insert operation for Apache Iceberg Tables and I have one point which I would like to make:

Apache Iceberg supports transactions in which datafiles(parquetfiles) can be added to the table. For that the DataSink trait provides a great way to write the files to an external storage. However, after the datafiles are written, the metadata of the iceberg table has to be updated. For that it would be necessary to have some kind of flush method in the DataSink trait that allows to update the state of the table.

Something like:

/// The DataSink implements writing streams of [`RecordBatch`]es to
/// partitioned destinations
pub trait DataSink: std::fmt::Debug + std::fmt::Display + Send + Sync {

    /// How does this sink want its input distributed?
    fn required_input_distribution(&self) -> Distribution;

    /// return a future which writes a RecordBatchStream to a particular partition
    /// and return the number of rows written
    fn write_stream(&self, partition: usize, input: SendableRecordBatchStream) -> BoxFuture<Result<u64>>;

    /// Indicate that all record batches have been written and allow to update the metadata of the table.
    /// This can be used to end a transaction on the table.
    fn flush(&mut self) -> BoxFuture<Result<()>>;
}

@JanKaul
Copy link
Contributor

JanKaul commented May 12, 2023

And should the methods be async? If the data is written to an object_store, it has to be async to know if the write was successful.

@tustvold
Copy link
Contributor

tustvold commented May 12, 2023

And should the methods be async

The return type is BoxFuture which would allow for asychronous completion. We could also use async_trait as we use that elsewhere, the two are roughly same thing but one just has more macro magic and slightly more complex lifetimes 😄

For that it would be necessary to have some kind of flush method in the DataSink trait that allows to update the state of the table

As the returned type is BoxFuture the implementation could update the state once the stream has been exhausted? i.e. something like

struct Table {
    state: Arc<Mutex<TableState>>
}

impl TableProvider for Table {
    async fn sink(&self) -> Result<Arc<dyn DataSink>> {
        Ok(Arc::new(TableWriter{state: self.state.clone()}))
    }
}

struct TableState {
    batches: Vec<RecordBatch>
}

struct TableWriter {
    state: Arc<Mutex<TableState>>,
}

impl DataSink for TableWriter {
    fn write_stream(&self, partition: usize, input: SendableRecordBatchStream) -> BoxFuture<Result<u64>> {
        async move {
            // Collect input
            let mut batches: Vec<_> = input.try_collect().await?;
            // Commit transaction
            self.state.lock().batches.append(&mut batches);
        }.boxed()
    }
}

return the number of rows written

I personally would return () and leave metrics accounting as a TableProvider detail

@JanKaul
Copy link
Contributor

JanKaul commented May 12, 2023

Thanks for your explanation. There is just one small detail that would be great for Iceberg/Deltalake.

Apache Iceberg and Deltalake use MVCC to guarantee atomic transactions on tables. Therefore they optimistically write the data of a transaction to some kind of storage. Once the data is written, the metadata of the table is updated if no other process has updated the metadata in the meantime.
With the approach that you are suggesting, the atomicity of a write transaction can only be guaranteed on a partition basis. This could lead to the scenario where one partition is written successfully, then another process updates the metadata and writing the following partitions could fail. Moreover, one would need to be really careful that the different asynchronous tasks don't invalidate the write operation from the other tasks even in the same logical write transaction.

The current porposal for the DataSink trait definitely simplifies the implementation of insert operations for Iceberg and Deltalake tables. It just doesn't allow the implementation of the full functionality for Iceberg and Deltalake. I just wanted to mention that.

Forget my comment on the asynchronous method, I somehow missed the BoxFuture.

@alamb
Copy link
Contributor Author

alamb commented May 12, 2023

@JanKaul I think we could add a finish() or commit() for DataSink without too much difficulty. I'll see what I can do

@alamb
Copy link
Contributor Author

alamb commented May 12, 2023

(BTW I'll plan to use async_trait so the fact the trait returns Futures is more obvious

@tustvold
Copy link
Contributor

tustvold commented May 12, 2023

the atomicity of a write transaction can only be guaranteed on a partition basis

I think you could do something where the last write_stream to complete flushes, but it is a bit odd for sure

This does lead to one question though, what actually is the meaning of partitioning in this context? The formulation of Distribution I don't believe is sufficient to express the partitioning that something like a Iceberg or Deltalake would require.

What does exposing the partitioning yield over something simple like

impl TableProvider {
    async fn insert(&self, ctx: Arc<TaskContext>, plan: Arc<dyn ExecutionPlan>) -> Result<()>;
}

@alamb
Copy link
Contributor Author

alamb commented May 12, 2023

I think you could do something where the last write_stream to complete flushes, but it is a bit odd for sure

I think it would be a common pattern (where you either want to commit all the partitions or none, rather than have some of them possibly complete and some fail). 🤔

This does lead to one question though, what actually is the meaning of partitioning in this context?

It allows the Sink to request data be split according to one of the following options:

https://docs.rs/datafusion/latest/datafusion/physical_plan/enum.Partitioning.html

And let the DataFusion optimizer do its thing to optimize its calculation.

The formulation of Distribution I don't believe is sufficient to express the partitioning that something like a Iceberg or Deltalake would require.

Can you please be more specific about what type of partitioning would be required? Are you thinking of "partition by value in a column (like the date)"?

What does exposing the partitioning yield over something simple like

It makes it easier to implement DataSink as you don't have to worry about the details of ExecutionPlans and connecting things up. Also, if we use partitioning then the calculation can be exposed to the rest of datafusion over time (e.g. take advantage of partitioned grouping)

@tustvold
Copy link
Contributor

tustvold commented May 12, 2023

I think it would be a common pattern

I'd wager almost 100% of workloads would want atomicity at the ExecutionPlan level, and how this is achieved will be very specific to that TableProvider.

Are you thinking of "partition by value in a column (like the date)"

Partitioning or bucketing by value would be the most common use-case, which is distinct from the sort of partitioning currently implemented by DataFusion.

It makes it easier to implement DataSink as you don't have to worry about the details of ExecutionPlans and connecting things up

Unless I'm missing something, its the difference between calling ExecutionPlan::execute and being given the result? Is that really a meaningful complexity?

I guess I'm just trying to play devils advocate for Keep-It-Simple 😄

@JanKaul
Copy link
Contributor

JanKaul commented May 12, 2023

Iceberg allows partitioning by hash, truncating a value, date. I was thinking of expressing the partitioning as a DISTRIBUTE BY (https://docs.rs/datafusion/24.0.0/datafusion/prelude/enum.Partitioning.html#variant.DistributeBy) and then use RepartitionExec on the input to apply it to the query. But that is just an initial idea I didn't get to it yet.

@tustvold
Copy link
Contributor

tustvold commented May 12, 2023

partitioning as a DISTRIBUTE BY

Yes, that is the construction you would definitely want. I don't believe a corresponding notion exists in the physical layer, which has pretty hard-coded assumptions about partition enumerability

TBC this is something we probably should fix, but it is likely a very intrusive change to decouple ExecutionPlan from partitioning

@alamb
Copy link
Contributor Author

alamb commented May 12, 2023

Yes, that is the construction you would definitely want. I don't believe a corresponding notion exists in the physical layer, which has pretty hard-coded assumptions about partition enumerability

I think @JanKaul 's idea in #6339 (comment) would work well

Unless I'm missing something, its the difference between calling ExecutionPlan::execute and being given the result? Is that really a meaningful complexity?

I am trying to avoid having to write a new ExecutionPlan for each table provider (aka avoid all boiler plate here): https://github.com/apache/arrow-datafusion/blob/main/datafusion/core/src/physical_plan/memory.rs

If the API is like this:

impl TableProvider {
    async fn insert(&self, ctx: Arc<TaskContext>, plan: Arc<dyn ExecutionPlan>) -> Result<()>;
}

I think the DataFusion implementation would be simpler, but now each sink would be potentially more complicated as it would have to deal with running multiple streams concurrently. But maybe that is ok 🤔

I am also trying to keep Execution and datasource separate (so I can eventually break them into different crates) -- maybe I can just make another trait to abstract away the execution plan detail

I am about to get ion to a plane -- I'll play around with it

@ozankabak
Copy link
Contributor

ozankabak commented May 13, 2023

So we have the "one-exec-for-each-provider" pattern on the read side, but we have a "single-exec-across-all-providers" pattern on the write side? Am I misunderstanding this? If this is indeed the case, what is the motivation and/or justification behind this asymmetry?

@JanKaul
Copy link
Contributor

JanKaul commented May 13, 2023

For the read side you can often make use of existing Execs. In my use case for Apache Iceberg I can implement the TableProvider and return a ParquetExec with the necessary files in the scan method. So I don't actually have to implement an Exec myself.

@alamb
Copy link
Contributor Author

alamb commented May 14, 2023

So we have the "one-exec-for-each-provider" pattern on the read side, but we have a "single-exec-across-all-providers" pattern on the write side? Am I misunderstanding this? If this is indeed the case, what is the motivation and/or justification behind this asymmetry?

The asymmetry is a good question @ozankabak

My thinking is

  1. The amount of common, replicated code, between inserts is substantial (when prototyping RFC: Implement initial support for COPY ... TO ... statement #6313 I had to make an execution plan that was 90% the same as MemoryWriteExec)
  2. The semantics of COPY or Insert are typically to return the number of rows written, not something that depends on the type of table being written into.
  3. I think long term splitting out the physical plans and datasources (e.g. split datafusion-physical-plan sub-module #1754 ) will make the codebase easier to work with and keep separation cleaner, so connecting datasource to physical plan more tightly worked in opposite directions

I think we could achieve most of the above by keeping the same insert_into that returns ExecutionPlan in TableProvider and refactoring the implementation.

However, it seemed like most of the flexibility gained by using an ExecutionPlan would only server to be more confusing

TableProvider and return a ParquetExec with the necessary files in the scan method. So I don't actually have to implement an Exec myself.

IOx follows this strategy as well

@ozankabak
Copy link
Contributor

ozankabak commented May 14, 2023

Aren't these arguments mostly applicable to the read side as well? I am worried that we may prematurely commit to a design without enough clarity in all possible implications and usage patterns.

I think this kind of a design question should be decided after we have some more concrete cases (i.e. more write execs like we have read execs now) and thus have enough data points to analyze various use cases. Only then we can perform a more serious pros/cons analysis. In this case, maybe the decision will be to refactor the read side as well, or maybe we'll see that doing this is not the right thing to do even for the write side.

@alamb
Copy link
Contributor Author

alamb commented May 15, 2023

Aren't these arguments mostly applicable to the read side as well?

Yes, basically, and if I could change it now I would change it not to return an ExecutionPlan somehow.

I am worried that we may prematurely commit to a design without enough clarity in all possible implications and usage patterns.

I think this kind of a design question should be decided after we have some more concrete cases (i.e. more write execs like we have read execs now) and thus have enough data points to analyze various use cases. Only then we can perform a more serious pros/cons analysis. In this case, maybe the decision will be to refactor the read side as well, or maybe we'll see that doing this is not the right thing to do even for the write side.

I think this is a good point (and there is some evidence for us not really knowing what the API should be in the discussions above with @tustvold like #6339 (comment)) . I think we can leave the TableProvider API in terms of ExecutionPlan and simply refactor the implementations to require less boiler plate code.

Let me see spend some more time with #6347 refining the idea

@alamb
Copy link
Contributor Author

alamb commented May 15, 2023

Here is a proposed PR that makes it easier to write insert plans, but does not change the TableProvider API #6354

@alamb
Copy link
Contributor Author

alamb commented Jun 3, 2023

I think this is done for now. Closing

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
enhancement New feature or request
Projects
None yet
Development

Successfully merging a pull request may close this issue.

4 participants