Skip to content

Commit

Permalink
Object Store API to read from remote storage systems
Browse files Browse the repository at this point in the history
  • Loading branch information
yjshen committed Aug 26, 2021
1 parent d31c157 commit 827072d
Show file tree
Hide file tree
Showing 5 changed files with 286 additions and 0 deletions.
2 changes: 2 additions & 0 deletions ballista/rust/core/src/serde/physical_plan/from_proto.rs
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@ use datafusion::arrow::datatypes::{DataType, Schema, SchemaRef};
use datafusion::catalog::catalog::{
CatalogList, CatalogProvider, MemoryCatalogList, MemoryCatalogProvider,
};
use datafusion::datasource::object_store::ObjectStoreRegistry;
use datafusion::execution::context::{
ExecutionConfig, ExecutionContextState, ExecutionProps,
};
Expand Down Expand Up @@ -627,6 +628,7 @@ impl TryFrom<&protobuf::PhysicalExprNode> for Arc<dyn PhysicalExpr> {
aggregate_functions: Default::default(),
config: ExecutionConfig::new(),
execution_props: ExecutionProps::new(),
object_store_registry: Arc::new(ObjectStoreRegistry::new()),
};

let fun_expr = functions::create_physical_fun(
Expand Down
1 change: 1 addition & 0 deletions datafusion/src/datasource/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ pub mod datasource;
pub mod empty;
pub mod json;
pub mod memory;
pub mod object_store;
pub mod parquet;

pub use self::csv::{CsvFile, CsvReadOptions};
Expand Down
128 changes: 128 additions & 0 deletions datafusion/src/datasource/object_store/local.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,128 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.

//! Object store that represents the Local File System.

use async_trait::async_trait;
use futures::{stream, AsyncRead, StreamExt};
use std::sync::Arc;

use crate::datasource::object_store::{
FileMeta, FileMetaStream, ObjectReader, ObjectStore,
};
use crate::error::DataFusionError;
use crate::error::Result;
use std::fs::Metadata;

#[derive(Debug)]
/// Local File System as Object Store.
pub struct LocalFileSystem;

#[async_trait]
impl ObjectStore for LocalFileSystem {
async fn list(&self, prefix: &str) -> Result<FileMetaStream> {
list_all(prefix.to_owned()).await
}

async fn get_reader(&self, file: FileMeta) -> Result<Arc<dyn ObjectReader>> {
Ok(Arc::new(LocalFileReader::new(file)?))
}
}

struct LocalFileReader {
file: FileMeta,
}

impl LocalFileReader {
fn new(file: FileMeta) -> Result<Self> {
Ok(Self { file })
}
}

#[async_trait]
impl ObjectReader for LocalFileReader {
async fn get_reader(
&self,
_start: u64,
_length: usize,
) -> Result<Arc<dyn AsyncRead>> {
todo!()
}

async fn length(&self) -> Result<u64> {
match self.file.size {
Some(size) => Ok(size),
None => Ok(0u64),
}
}
}

async fn list_all(prefix: String) -> Result<FileMetaStream> {
fn get_meta(path: String, metadata: Metadata) -> FileMeta {
FileMeta {
path,
last_modified: metadata.modified().map(chrono::DateTime::from).ok(),
size: Some(metadata.len()),
}
}

async fn find_files_in_dir(
path: String,
to_visit: &mut Vec<String>,
) -> Result<Vec<FileMeta>> {
let mut dir = tokio::fs::read_dir(path).await?;
let mut files = Vec::new();

while let Some(child) = dir.next_entry().await? {
if let Some(child_path) = child.path().to_str() {
let metadata = child.metadata().await?;
if metadata.is_dir() {
to_visit.push(child_path.to_string());
} else {
files.push(get_meta(child_path.to_owned(), metadata))
}
} else {
return Err(DataFusionError::Plan("Invalid path".to_string()));
}
}
Ok(files)
}

let prefix_meta = tokio::fs::metadata(&prefix).await?;
let prefix = prefix.to_owned();
if prefix_meta.is_file() {
Ok(Box::pin(stream::once(async move {
Ok(get_meta(prefix, prefix_meta))
})))
} else {
let result = stream::unfold(vec![prefix], move |mut to_visit| async move {
match to_visit.pop() {
None => None,
Some(path) => {
let file_stream = match find_files_in_dir(path, &mut to_visit).await {
Ok(files) => stream::iter(files).map(Ok).left_stream(),
Err(e) => stream::once(async { Err(e) }).right_stream(),
};

Some((file_stream, to_visit))
}
}
})
.flatten();
Ok(Box::pin(result))
}
}
127 changes: 127 additions & 0 deletions datafusion/src/datasource/object_store/mod.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,127 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.

//! Object Store abstracts access to an underlying file/object storage.

pub mod local;

use std::collections::HashMap;
use std::fmt::Debug;
use std::pin::Pin;
use std::sync::{Arc, RwLock};

use async_trait::async_trait;
use futures::{AsyncRead, Stream};

use local::LocalFileSystem;

use crate::error::{DataFusionError, Result};
use chrono::Utc;

/// Object Reader for one file in a object store
#[async_trait]
pub trait ObjectReader {
/// Get reader for a part [start, start + length] in the file asynchronously
async fn get_reader(&self, start: u64, length: usize) -> Result<Arc<dyn AsyncRead>>;

/// Get length for the file asynchronously
async fn length(&self) -> Result<u64>;
}

/// File meta we got from object store
pub struct FileMeta {
/// Path of the file
pub path: String,
/// Last time the file was modified in UTC
pub last_modified: Option<chrono::DateTime<Utc>>,
/// File size in total
pub size: Option<u64>,
}

/// Stream of files get listed from object store
pub type FileMetaStream =
Pin<Box<dyn Stream<Item = Result<FileMeta>> + Send + Sync + 'static>>;

/// A ObjectStore abstracts access to an underlying file/object storage.
/// It maps strings (e.g. URLs, filesystem paths, etc) to sources of bytes
#[async_trait]
pub trait ObjectStore: Sync + Send + Debug {
/// Returns all the files in path `prefix` asynchronously
async fn list(&self, prefix: &str) -> Result<FileMetaStream>;

/// Get object reader for one file
async fn get_reader(&self, file: FileMeta) -> Result<Arc<dyn ObjectReader>>;
}

static LOCAL_SCHEME: &str = "file";

/// A Registry holds all the object stores at runtime with a scheme for each store.
/// This allows the user to extend DataFusion with different storage systems such as S3 or HDFS
/// and query data inside these systems.
pub struct ObjectStoreRegistry {
/// A map from scheme to object store that serve list / read operations for the store
pub object_stores: RwLock<HashMap<String, Arc<dyn ObjectStore>>>,
}

impl ObjectStoreRegistry {
/// Create the registry that object stores can registered into.
/// ['LocalFileSystem'] store is registered in by default to support read local files natively.
pub fn new() -> Self {
let mut map: HashMap<String, Arc<dyn ObjectStore>> = HashMap::new();
map.insert(LOCAL_SCHEME.to_string(), Arc::new(LocalFileSystem));

Self {
object_stores: RwLock::new(map),
}
}

/// Adds a new store to this registry.
/// If a store of the same prefix existed before, it is replaced in the registry and returned.
pub fn register_store(
&self,
scheme: String,
store: Arc<dyn ObjectStore>,
) -> Option<Arc<dyn ObjectStore>> {
let mut stores = self.object_stores.write().unwrap();
stores.insert(scheme, store)
}

/// Get the store registered for scheme
pub fn get(&self, scheme: &str) -> Option<Arc<dyn ObjectStore>> {
let stores = self.object_stores.read().unwrap();
stores.get(scheme).cloned()
}

/// Get a suitable store for the URI based on it's scheme. For example:
/// URI with scheme file or no schema will return the default LocalFS store,
/// URI with scheme s3 will return the S3 store if it's registered.
pub fn get_by_uri(&self, uri: &str) -> Result<Arc<dyn ObjectStore>> {
if let Some((scheme, _)) = uri.split_once(':') {
let stores = self.object_stores.read().unwrap();
if let Some(store) = stores.get(&*scheme.to_lowercase()) {
Ok(store.clone())
} else {
Err(DataFusionError::Internal(format!(
"No suitable object store found for {}",
scheme
)))
}
} else {
Ok(Arc::new(LocalFileSystem))
}
}
}
28 changes: 28 additions & 0 deletions datafusion/src/execution/context.rs
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,7 @@ use crate::catalog::{
ResolvedTableReference, TableReference,
};
use crate::datasource::csv::CsvFile;
use crate::datasource::object_store::{ObjectStore, ObjectStoreRegistry};
use crate::datasource::parquet::ParquetTable;
use crate::datasource::TableProvider;
use crate::error::{DataFusionError, Result};
Expand Down Expand Up @@ -164,6 +165,7 @@ impl ExecutionContext {
aggregate_functions: HashMap::new(),
config,
execution_props: ExecutionProps::new(),
object_store_registry: Arc::new(ObjectStoreRegistry::new()),
})),
}
}
Expand Down Expand Up @@ -363,6 +365,29 @@ impl ExecutionContext {
self.state.lock().unwrap().catalog_list.catalog(name)
}

/// Registers a object store with scheme using a custom `ObjectStore` so that
/// an external file system or object storage system could be used against this context.
///
/// Returns the `ObjectStore` previously registered for this scheme, if any
pub fn register_object_store(
&self,
scheme: impl Into<String>,
object_store: Arc<dyn ObjectStore>,
) -> Option<Arc<dyn ObjectStore>> {
let scheme = scheme.into();

self.state
.lock()
.unwrap()
.object_store_registry
.register_store(scheme, object_store)
}

/// Retrieves a `ObjectStore` instance by scheme
pub fn object_store(&self, scheme: &str) -> Option<Arc<dyn ObjectStore>> {
self.state.lock().unwrap().object_store_registry.get(scheme)
}

/// Registers a table using a custom `TableProvider` so that
/// it can be referenced from SQL statements executed against this
/// context.
Expand Down Expand Up @@ -849,6 +874,8 @@ pub struct ExecutionContextState {
pub config: ExecutionConfig,
/// Execution properties
pub execution_props: ExecutionProps,
/// Object Store that are registered with the context
pub object_store_registry: Arc<ObjectStoreRegistry>,
}

impl ExecutionProps {
Expand Down Expand Up @@ -876,6 +903,7 @@ impl ExecutionContextState {
aggregate_functions: HashMap::new(),
config: ExecutionConfig::new(),
execution_props: ExecutionProps::new(),
object_store_registry: Arc::new(ObjectStoreRegistry::new()),
}
}

Expand Down

0 comments on commit 827072d

Please sign in to comment.