Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

ObjectStore API to read from remote storage systems #950

Merged
merged 13 commits into from
Sep 10, 2021
Merged

Conversation

yjshen
Copy link
Member

@yjshen yjshen commented Aug 26, 2021

Which issue does this PR close?

Closes #616 .

Rationale for this change

Currently, we can only read files from LocalFS since we use std::fs in ParquetExec. It would be nice to add support to read files that reside on storage sources such as HDFS, Amazon S3, etc.

This PR substitute #811 by only adding in the ObjectStore interfaces, leave hooking these APIs up into the rest of the system / rewrite the existing data sources (like Parquet, etc) as separate follow-ups.

What changes are included in this PR?

Introduce ObjectStore API as an abstraction of the underlying storage systems, such as local filesystem, HDFS, S3, etc. And make the ObjectStore implementation pluggable through the ObjectStoreRegistery in Datafusion's ExecutionContext.

Are there any user-facing changes?

Users can provide implementations for the ObjectStore trait, register it into ExecutionContext's registry, and run queries against data that resides in remote storage systems as well as local fs (the only default implementation of ObjectStore).

@github-actions github-actions bot added ballista datafusion Changes in the datafusion crate labels Aug 26, 2021
Copy link
Contributor

@rdettai rdettai left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hi @yjshen ! Thanks for opening this new PR with the remote storage system only! Sorry I didn't have the chance to submit all the feedback to the design document in time.

I think it brings a lot of value compared to how things work currently. But I still have the feeling that this is not the ideal level of abstraction. In my opinion (but this might be overthinking), the structure that generates the list of files (ObjectStore.list()) should take:

  • an URI (string just like the prefix currently), which could be a sort of path (bucket+prefix) for a plain object store like S3, but could also be something a bit more evolved:
    • an S3 location with hive partitioning (URI=bucket/prefix?partition=year&partition=month)
    • a delta table (URI=bucket/prefix?versionAsOf=v2)
  • an expression so that we can pushdown the filter to the generation of the file list. This is VERY important for very large datasets with lots of files where listing all files is too long.

datafusion/src/datasource/object_store/mod.rs Outdated Show resolved Hide resolved
@yjshen
Copy link
Member Author

yjshen commented Aug 26, 2021

@rdettai Thanks for reviewing 👍

an URI (string just like the prefix currently), which could be a sort of path (bucket+prefix) for a plain object store like S3, but could also be something a bit more evolved:

  • an S3 location with hive partitioning (URI=bucket/prefix?partition=year&partition=month)
  • a delta table (URI=bucket/prefix?versionAsOf=v2)

I think this could be achieved inside the S3 object store implementation with another PR on the PartitionedFile abstraction #946 #932 . list could return a stream of PartitionedFile instead of the current FileMeta. (PartitionedFile could have a field of FileMeta).

an expression so that we can pushdown the filter to the generation of the file list. This is VERY important for very large datasets with lots of files where listing all files is too long.

I think the current, non-filtering version of the listing is made here for simplicity. check more discussions on this in doc here

@rdettai
Copy link
Contributor

rdettai commented Aug 27, 2021

I think this could be achieved inside the S3 object store implementation with another PR

Agreed, my point is mostly about the naming/doc, I find prefix pretty restrictive

I think the current, non-filtering version of the listing is made here for simplicity

Understood! As the goal of this PR is to set the interface, I thought we might as well include everything that will be required, and I am pretty sure that filter pushdown to the file listing is a must have. Note that it can be included into the interface and not used in the implementations at first, just like the filter pushdown in TableProvider .

@rdettai rdettai mentioned this pull request Aug 27, 2021
7 tasks
Copy link
Member

@houqp houqp left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Overall looks good to me, thanks @yjshen !

datafusion/src/datasource/object_store/local.rs Outdated Show resolved Hide resolved
datafusion/src/datasource/object_store/mod.rs Outdated Show resolved Hide resolved
Copy link
Contributor

@alamb alamb left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think this is a good step. thank you @yjshen !

I propose leaving this open for the weekend to gather any additional comments, and if no one objects merge it in Monday

datafusion/src/datasource/object_store/local.rs Outdated Show resolved Hide resolved
datafusion/src/datasource/object_store/mod.rs Outdated Show resolved Hide resolved
@houqp
Copy link
Member

houqp commented Aug 27, 2021

@rdettai on the topics of prefix semantic and filter pushdown, in my mind, these are higher level logic/abstractions that are not natively supported by the object store. For example, I am under the assumption that TableProvider is responsible for parsing the partition parameters from the URI bucket/prefix?partition=year&partition=month and constructing the list object API calls with appropriate prefixes. Similar for URIs like bucket/prefix?versionAsOf=v2, the delta table TableProvider should be responsible to parse out versionAsOf=v2 and issue corresponding object list calls to gather the right list of commit/snapshot objects to fetch from the object store.

I am also having a hard time imaging how will filter push down expressions be used to help the object store build a better or more efficient API call. It seems like these kind of higher level logic should be handled at layers above ObjectStore? They seem to be object store agnostics. As a result, it would be better to just implement them once in abstraction layers like TableProvider instead of adding separate implementation in each type of object store.

Does this make sense to you? Or am I missing specific features from some object stores that can support native filter push down optimizations at the API level?

@yjshen
Copy link
Member Author

yjshen commented Aug 28, 2021

@rdettai @houqp After checking DataFusion and delta-rs code, I think it is more natural to have the TableProvider dealing with filters; it's the abstraction over a table, therefore also a suitable entity for table partitions (inferred or user-provided). The laziness of file listing could be achieved by TableProvider and rely on ObjectStore to only listing related files.

@rdettai
Copy link
Contributor

rdettai commented Aug 29, 2021

I am under the assumption that TableProvider is responsible for parsing the partition parameters from the URI bucket/prefix?partition=year&partition=month and constructing the list object API calls with appropriate prefixes

In the case of hive partitioning, you need an extra capability of the object store to achieve that: list the folders (ex for s3 here). Otherwise you cannot now what partitions are there and build the appropriate prefixes. Maybe this example helps getting more context: https://docs.google.com/document/d/1nKXPvolft1_DuVjVK7ceOx37arCdQLKc_Im-TfZh9dk/edit?usp=sharing

@houqp
Copy link
Member

houqp commented Aug 29, 2021

Thanks @rdettai for the detailed google doc write up. I left some comments there :) I think the object store interface proposed in this PR does support listing object keys with partition prefixes, for example: ObjectStore::lsit("bucket/path/date=2018"). But it's missing an important argument delimiter, which is required to make "folder" hierarchy discovery more efficient in object stores.

@rdettai
Copy link
Contributor

rdettai commented Aug 29, 2021

@yjshen @houqp you are right, I think that maybe with the delimiter argument added to ObjectStore.list(), the TableProvider would be able to get all the information it needs to efficiently prune the partitions in a raw hive partitioning setup! This way we leave the ObjectStore abstraction with only core object store capabilities 😄

I am still wondering how we could replicate the hive partitioning support for TableProviders relative to different file formats (ParquetTable, CsvFile, NdJsonFile). Actually, same would apply for a HiveCatalogTable that could actually reference various file formats. It makes me feel that something is wrong with how the different abstractions are organized. This is being discussed in #133.

@yjshen
Copy link
Member Author

yjshen commented Aug 29, 2021

Thanks @rdettai @houqp for the optional delimiter.

I was thinking list API could deal with multiple partition columns by listing one directory at a time for filesystems (instead of listing leaf files recursively for non-partitioned tables), therefore could achieve a lazy listing when implementing ObjectStore.

After taking both partitioned and non-partitioned tables in one remote storage into consideration, this original list signature would lead to two ObjectStore implementations for one storage system, one for partitioned and one for non-partitioned. It's wired. I like the idea of an optional delimiter because it brings a lot of flexibility.

@houqp
Copy link
Member

houqp commented Aug 30, 2021

I am still wondering how we could replicate the hive partitioning support for TableProviders relative to different file formats (ParquetTable, CsvFile, NdJsonFile).

For raw file tables like a folder of parquet, csv and ndjson files, I think the partition discovery and pruning logic should be agnostic to both file formats and object storages. We could implement this logic as a shared module, which gets invoked in ParquetTable, CsvFile and NdJsonFile table providers to resolve the filtered down object list. Then these table providers will pass down the object paths to corresponding format specific physical execution plans.

Actually, same would apply for a HiveCatalogTable that could actually reference various file formats. It makes me feel that something is wrong with how the different abstractions are organized. This is being discussed in #133.

Given that hive catalog manages a collection of databases and tables with their corresponding metadata, I think CatalogProvider is probably the right abstraction for it. The Hive catalog provider could query hive metastore for a given schema and table name. It can then construct a format specific table provider based on hive metastore response.

Copy link
Member

@houqp houqp left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM, I will leave it to @alamb to do the final merge so he can take a final look since the last round of update.

prefix: &str,
_delimiter: Option<String>,
) -> Result<FileMetaStream> {
list_all(prefix.to_owned()).await
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

we can maybe mark this as // TODO as the delimiter is ignored 😉

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think it's fine to leave out the TODO since the partitioned file read in not supported?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

it seems to me that if you don't comply to the API, a TODO is appropriate even if nobody is using the feature yet 😃

Copy link
Contributor

@rdettai rdettai Aug 30, 2021

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

also, if the method can return folders, the name of the structs returned (FileMeta) is kind of misleading.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

PathMeta? FilePathMeta? 😂 I'm run out of naming

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

naming is hard indeed :D

how about ListEntry or ListItem? we can define it as an enum of FileMeta and prefix string to distinguish between regular objects and "folder" prefixes.

References:
S3 has prefixes returned in the <CommonPrefixes> section: https://docs.aws.amazon.com/AmazonS3/latest/API/API_ListObjectsV2.html
GCS has prefixes returned in response object's prefixes property: https://cloud.google.com/storage/docs/json_api/v1/objects/list#response
Azure blob is similar to S3 and has prefix strings returned in its BlobPrefix section: https://docs.microsoft.com/en-us/rest/api/storageservices/enumerating-blob-resources#DelimitedBlobList

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

There might be value in having two list apis as well, one for listing objects without delimiters and returns stream of FileMeta, the other for listing "directories" with delimiters that could return either FileMeta or prefix strings.

I am thinking there are use-case where we only need to perform listing without delimiters. For example after we have partition "folders" fully discovered, we will just use list api call to get the final list of objects using partition prefixes. Returning a stream of FileMeta directly in this case could avoid enum matching overhead on the call site.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

FWIW in IOx we went back and forth on this as well. There we have one LIST API https://github.com/influxdata/influxdb_iox/blob/main/object_store/src/lib.rs#L95-L98 that takes anObjectStorePath and the path itself can distinguish between "is a (logical) directory" or "is a filename" : https://github.com/influxdata/influxdb_iox/blob/main/object_store/src/path.rs#L30-L42

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@alamb that's interesting, do you have any opinion on the design here?

Since list without delimiter always returns object keys, not logical directory keys, I feel like returning FileMeta in list call provides a better more statically typed interface for callers.

In IOx, do you remember what was the reason for going back to ObjectStorePath as return type for regular list calls?

@yjshen yjshen requested review from houqp and alamb August 31, 2021 04:04
Copy link
Member

@houqp houqp left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM, thanks @yjshen for the quick turnaround. @rdettai you want to take another look?

@yjshen
Copy link
Member Author

yjshen commented Aug 31, 2021

Python test failure seems unrelated, could you please retrigger this single GitHub action @houqp ? Thanks!

@houqp
Copy link
Member

houqp commented Aug 31, 2021

looks like python test is broken on master

@yjshen
Copy link
Member Author

yjshen commented Sep 2, 2021

@alamb What's your opinion on the current list_file and list_dir API?

@houqp
Copy link
Member

houqp commented Sep 2, 2021

@alamb is on vacation this week :P

@alamb
Copy link
Contributor

alamb commented Sep 10, 2021

Sorry for the delay @yjshen -- I will review this carefully today.

@alamb
Copy link
Contributor

alamb commented Sep 10, 2021

@alamb What's your opinion on the current list_file and list_dir API?

I think it looks good 👍

Copy link
Contributor

@alamb alamb left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think this code looks really (really!) good @yjshen -- thank you to you, @rdettai @houqp and all the other contributors who made this a reality ❤️

@alamb
Copy link
Contributor

alamb commented Sep 10, 2021

We have resolved the python CI failure on master so I am going to merge this PR in as is. Thanks again @yjshen

@alamb alamb merged commit 6f53180 into apache:master Sep 10, 2021
@yjshen
Copy link
Member Author

yjshen commented Sep 10, 2021

Thanks again for all the guidance and help @houqp @rdettai @alamb 🎉 .

@houqp
Copy link
Member

houqp commented Sep 10, 2021

Amazing work on laying out a solid foundation for IO abstraction in datafusion @yjshen !

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
datafusion Changes in the datafusion crate enhancement New feature or request
Projects
None yet
Development

Successfully merging this pull request may close these issues.

Add support for reading distributed datasets
4 participants