Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: cloud_upload(<filename>) table func #2670

Merged
merged 36 commits into from
Feb 21, 2024
Merged
Show file tree
Hide file tree
Changes from 22 commits
Commits
Show all changes
36 commits
Select commit Hold shift + click to select a range
11112a9
wip: skeleton for glare_upload table func
greyscaled Feb 15, 2024
1b93331
chore(minor): typos
greyscaled Feb 15, 2024
2728252
broken state
greyscaled Feb 16, 2024
5d3a128
glare -> glaredb
greyscaled Feb 16, 2024
9661db1
GlareDB
greyscaled Feb 16, 2024
7146d41
pairing
tychoish Feb 16, 2024
1c7876c
more notes
tychoish Feb 16, 2024
8d08263
changes
greyscaled Feb 16, 2024
cfff8fc
getting closer
greyscaled Feb 16, 2024
5cbc401
testing something on cld
greyscaled Feb 16, 2024
31c7be4
Merge branch 'main' into grey/read_upload
greyscaled Feb 16, 2024
42f6063
Merge branch 'main' into grey/read_upload
greyscaled Feb 20, 2024
b70a6ea
fmt
greyscaled Feb 20, 2024
a5167b4
fixup! fmt
greyscaled Feb 20, 2024
fb11d2d
Merge origin/main
greyscaled Feb 20, 2024
1a0a2ad
let's see if it works...
greyscaled Feb 20, 2024
1fc158f
remove unused field
greyscaled Feb 20, 2024
527e691
fixup! remove unused field
greyscaled Feb 20, 2024
934d353
refactor and fmt
greyscaled Feb 20, 2024
0a6638d
lint
greyscaled Feb 20, 2024
545b648
impl
greyscaled Feb 20, 2024
44327de
clippy, fmt
greyscaled Feb 21, 2024
d631109
cleanup the listing
greyscaled Feb 21, 2024
9917374
just fmt
greyscaled Feb 21, 2024
c346c30
comment
greyscaled Feb 21, 2024
6818cb0
move var
greyscaled Feb 21, 2024
c6fc3c0
remove CI changes
greyscaled Feb 21, 2024
0ba3826
Merge origin/main
greyscaled Feb 21, 2024
d908328
err if running locally
greyscaled Feb 21, 2024
514299c
fmt
greyscaled Feb 21, 2024
78f8c03
Revert "remove CI changes"
greyscaled Feb 21, 2024
5aa18c7
s/glaredb_uplopad/cloud_upload
greyscaled Feb 21, 2024
875fd20
Reapply "remove CI changes"
greyscaled Feb 21, 2024
4d1fc54
Merge origin/main
greyscaled Feb 21, 2024
fa7d126
fix comment
greyscaled Feb 21, 2024
900ea5b
Merge branch 'main' into grey/read_upload
greyscaled Feb 21, 2024
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
11 changes: 5 additions & 6 deletions .github/workflows/ci.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -593,10 +593,10 @@ jobs:
- name: PostgreSQL
path: "sqllogictests_postgres/*"
prepare: |
./scripts/prepare-testdata.sh
export POSTGRES_TEST_DB=$(./scripts/create-test-postgres-db.sh)
export POSTGRES_CONN_STRING=$(echo "$POSTGRES_TEST_DB" | sed -n 1p)
export POSTGRES_TUNNEL_SSH_CONN_STRING=$(echo "$POSTGRES_TEST_DB" | sed -n 2p)
./scripts/prepare-testdata.sh
export POSTGRES_TEST_DB=$(./scripts/create-test-postgres-db.sh)
export POSTGRES_CONN_STRING=$(echo "$POSTGRES_TEST_DB" | sed -n 1p)
export POSTGRES_TUNNEL_SSH_CONN_STRING=$(echo "$POSTGRES_TEST_DB" | sed -n 2p)
runs-on: ubuntu-latest
needs: ["sql-logic-tests"]
steps:
Expand Down Expand Up @@ -629,10 +629,9 @@ jobs:

just slt-bin --protocol=flightsql ${{matrix.settings.path}}


docker-push:
name: Build and Push Docker Image
if: github.event_name == 'push' && github.repository == 'GlareDB/glaredb' && github.ref == 'refs/heads/main'
# if: github.event_name == 'push' && github.repository == 'GlareDB/glaredb' && github.ref == 'refs/heads/main'
greyscaled marked this conversation as resolved.
Show resolved Hide resolved
greyscaled marked this conversation as resolved.
Show resolved Hide resolved
needs: ["sql-logic-tests"]
runs-on: ubuntu-latest-8-cores
permissions:
Expand Down
4 changes: 2 additions & 2 deletions crates/datasources/src/bson/table.rs
Original file line number Diff line number Diff line change
Expand Up @@ -75,7 +75,7 @@ pub async fn bson_streaming_table(
// field name against the schema, and only pull out the
// fields that match. This is easier in the short term
// but less performant for large documents where the
// docuemnts are a superset of the schema, we'll end up
// documents are a superset of the schema, we'll end up
// doing much more parsing work than is actually needed
// for the bson documents.
|bt: Result<BytesMut, std::io::Error>| -> Result<RawDocumentBuf, BsonError> {
Expand Down Expand Up @@ -112,7 +112,7 @@ pub async fn bson_streaming_table(
readers.pop_front();
}

// infer the sechema; in the future we can allow users to specify the
// infer the schema; in the future we can allow users to specify the
// schema directly; in the future users could specify the schema (kind
// of as a base-level projection, but we'd need a schema specification
// language). Or have some other strategy for inference rather than
Expand Down
32 changes: 21 additions & 11 deletions crates/datasources/src/native/access.rs
Original file line number Diff line number Diff line change
Expand Up @@ -41,11 +41,13 @@ use crate::native::insert::NativeTableInsertExec;
#[derive(Debug, Clone)]
pub struct NativeTableStorage {
db_id: Uuid,
/// URL pointing to the bucket and/or directory which is the root of the native storage.

/// URL pointing to the bucket and/or directory which is the root of the
/// native storage, for example `gs://<bucket-name>`.
///
/// In other words this is the location to which the the table prefix is applied to get
/// a full table URL.
root_url: Url,
/// In other words this is the location to which the the table prefix is
/// applied to get a full table URL.
pub root_url: Url,
greyscaled marked this conversation as resolved.
Show resolved Hide resolved

/// Tables are only located in one bucket which the provided service account
/// should have access to.
Expand All @@ -56,12 +58,15 @@ pub struct NativeTableStorage {
/// what this type is for.
///
/// Arcs all the way down...
store: SharedObjectStore,
pub store: SharedObjectStore,
greyscaled marked this conversation as resolved.
Show resolved Hide resolved
}
// Deltalake is expecting a factory that implements `ObjectStoreFactory` and `LogStoreFactory`.
// Since we already have an object store, we don't need to do anything here,
// but we still need to register the url with delta-rs so it does't error when it tries to validate the object-store.
// So we just create a fake factory that returns the object store we already have and register it with the root url.

/// Deltalake is expecting a factory that implements [`ObjectStoreFactory`] and
/// [`LogStoreFactory`]. Since we already have an object store, we don't need to
/// do anything here, but we still need to register the url with delta-rs so it
/// does't error when it tries to validate the object-store. So we just create a
/// fake factory that returns the object store we already have and register it
/// with the root url.
struct FakeStoreFactory {
pub store: ObjectStoreRef,
}
Expand All @@ -88,6 +93,10 @@ impl LogStoreFactory for FakeStoreFactory {
Ok(default_logstore(store, location, options))
}
}

/// DeltaField represents data types as stored in Delta Lake, with additional
/// metadata for indicating the 'real' (original) type, for cases when
/// downcasting occurs.
struct DeltaField {
data_type: DeltaDataType,
metadata: Option<HashMap<String, Value>>,
Expand Down Expand Up @@ -142,8 +151,8 @@ fn arrow_to_delta_safe(arrow_type: &DataType) -> DeltaResult<DeltaField> {
}

impl NativeTableStorage {
/// Create a native table storage provider from a URL and an object store instance
/// rooted at that location.
/// Create a native table storage provider from a URL and an object store
/// instance rooted at that location.
pub fn new(db_id: Uuid, root_url: Url, store: Arc<dyn ObjectStore>) -> NativeTableStorage {
// register the default handlers
// TODO, this should only happen once
Expand Down Expand Up @@ -172,6 +181,7 @@ impl NativeTableStorage {
self.db_id
}

/// Returns the location of 'native' Delta Lake tables.
fn table_prefix(&self, tbl_id: u32) -> String {
format!("databases/{}/tables/{}", self.db_id, tbl_id)
}
Expand Down
18 changes: 18 additions & 0 deletions crates/datasources/src/object_store/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -284,6 +284,24 @@ pub struct ObjStoreTableProvider {
file_format: Arc<dyn FileFormat>,
}

impl ObjStoreTableProvider {
pub fn new(
store: Arc<dyn ObjectStore>,
arrow_schema: SchemaRef,
base_url: ObjectStoreUrl,
objects: Vec<ObjectMeta>,
file_format: Arc<dyn FileFormat>,
) -> ObjStoreTableProvider {
ObjStoreTableProvider {
store,
arrow_schema,
base_url,
objects,
file_format,
}
}
}

#[async_trait]
impl TableProvider for ObjStoreTableProvider {
fn as_any(&self) -> &dyn Any {
Expand Down
9 changes: 4 additions & 5 deletions crates/sqlbuiltins/src/functions/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -35,13 +35,11 @@ use table::{BuiltinTableFuncs, TableFunc};
use self::alias_map::AliasMap;
use crate::functions::scalars::openai::OpenAIEmbed;

/// All builtin functions available for all sessions.
/// FUNCTION_REGISTRY provides all implementations of [`BuiltinFunction`]
pub static FUNCTION_REGISTRY: Lazy<FunctionRegistry> = Lazy::new(FunctionRegistry::new);

/// A builtin function.
/// This trait is implemented by all builtin functions.
/// This is used to derive catalog entries for all supported functions.
/// Any new function MUST implement this trait.
/// BuiltinFunction **MUST** be implemented by all builtin functions, including
/// new ones. This is used to derive catalog entries for all supported functions.
pub trait BuiltinFunction: Sync + Send {
/// The name for this function. This name will be used when looking up
/// function implementations.
Expand Down Expand Up @@ -87,6 +85,7 @@ pub trait ConstBuiltinFunction: Sync + Send {
None
}
}

/// The namespace of a function.
///
/// Optional -> "namespace.function" || "function"
Expand Down
7 changes: 4 additions & 3 deletions crates/sqlbuiltins/src/functions/table/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -47,15 +47,15 @@ use self::json::JsonScan;
use self::lance::LanceScan;
use self::mongodb::ReadMongoDb;
use self::mysql::ReadMysql;
use self::object_store::{READ_CSV, READ_JSON, READ_PARQUET};
use self::object_store::{GlareDBUpload, READ_CSV, READ_JSON, READ_PARQUET};
use self::postgres::ReadPostgres;
use self::snowflake::ReadSnowflake;
use self::sqlite::ReadSqlite;
use self::sqlserver::ReadSqlServer;
use self::system::cache_external_tables::CacheExternalDatabaseTables;
use self::virtual_listing::{ListColumns, ListSchemas, ListTables};
use super::alias_map::AliasMap;
use super::BuiltinFunction;
use crate::functions::alias_map::AliasMap;
use crate::functions::BuiltinFunction;

/// A builtin table function.
/// Table functions are ones that are used in the FROM clause.
Expand Down Expand Up @@ -103,6 +103,7 @@ impl BuiltinTableFuncs {
Arc::new(READ_JSON),
Arc::new(BsonScan),
Arc::new(JsonScan),
Arc::new(GlareDBUpload),
// Data lakes
Arc::new(DeltaScan),
Arc::new(IcebergScan),
Expand Down
135 changes: 130 additions & 5 deletions crates/sqlbuiltins/src/functions/table/object_store.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
use std::collections::HashMap;
use std::marker::PhantomData;
use std::path::Path;
use std::sync::Arc;
use std::vec;

Expand All @@ -16,19 +17,21 @@ use datafusion::logical_expr::{Signature, TypeSignature, Volatility};
use datafusion_ext::errors::{ExtensionError, Result};
use datafusion_ext::functions::{FuncParamValue, IdentValue, TableFuncContextProvider};
use datasources::common::url::{DatasourceUrl, DatasourceUrlType};
use datasources::native::access::NativeTableStorage;
use datasources::object_store::gcs::GcsStoreAccess;
use datasources::object_store::generic::GenericStoreAccess;
use datasources::object_store::http::HttpStoreAccess;
use datasources::object_store::local::LocalStoreAccess;
use datasources::object_store::s3::S3StoreAccess;
use datasources::object_store::{MultiSourceTableProvider, ObjStoreAccess};
use futures::TryStreamExt;
use datasources::object_store::{MultiSourceTableProvider, ObjStoreAccess, ObjStoreTableProvider};
use futures::{StreamExt, TryStreamExt};
use object_store::azure::AzureConfigKey;
use object_store::path::Path as ObjectStorePath;
use object_store::{ObjectMeta, ObjectStore};
use protogen::metastore::types::catalog::{FunctionType, RuntimePreference};
use protogen::metastore::types::options::{CredentialsOptions, StorageOptions};

use super::TableFunc;
use crate::functions::BuiltinFunction;
use crate::functions::{BuiltinFunction, ConstBuiltinFunction, TableFunc};

#[derive(Debug, Clone, Copy)]
pub struct ParquetOptionsReader;
Expand Down Expand Up @@ -123,7 +126,7 @@ pub trait OptionReader: Sync + Send + Sized {
/// List of options and their expected data types.
const OPTIONS: &'static [(&'static str, DataType)];

/// Read user provided options, and construct a file format usign those options.
/// Read user provided options, and construct a file format using those options.
fn read_options(opts: &HashMap<String, FuncParamValue>) -> Result<Self::Format>;
}

Expand Down Expand Up @@ -582,3 +585,125 @@ fn create_azure_store_access(
storage_options: opts,
}))
}

#[derive(Debug, Clone, Copy, Default)]
pub struct GlareDBUpload;

impl ConstBuiltinFunction for GlareDBUpload {
const NAME: &'static str = "glaredb_upload";
const DESCRIPTION: &'static str = "Reads a file that was uploaded to GlareDB Cloud.";
const EXAMPLE: &'static str = "SELECT * FROM glaredb_upload('my_upload.csv')";
const FUNCTION_TYPE: FunctionType = FunctionType::TableReturning;

// signature for GlareUpload is a single filename. The filename may
// optionally contain an extension, though it is not required. Filename
// should not be a path.
greyscaled marked this conversation as resolved.
Show resolved Hide resolved
fn signature(&self) -> Option<Signature> {
Some(Signature::uniform(
1,
vec![DataType::Utf8],
Volatility::Stable,
))
}
}

#[async_trait]
impl TableFunc for GlareDBUpload {
fn detect_runtime(
&self,
_args: &[FuncParamValue],
_parent: RuntimePreference,
) -> Result<RuntimePreference> {
// Uploads can only exist remotely; this operation is not meaningful
// when not connected to remote/hybrid.
Ok(RuntimePreference::Remote)
}

async fn create_provider(
&self,
ctx: &dyn TableFuncContextProvider,
args: Vec<FuncParamValue>,
_opts: HashMap<String, FuncParamValue>,
) -> Result<Arc<dyn TableProvider>> {
if args.len() != 1 {
return Err(ExtensionError::InvalidNumArgs);
}

let session_state = ctx.get_session_state();

// NativeTableStorage is available on Remote ctx. Use store already
// constructed there.
let storage = ctx
.get_session_state()
.config()
.get_extension::<NativeTableStorage>()
.ok_or_else(|| {
ExtensionError::String(
format!(
"access unavailable, {} is not supported in local environments",
self.name(),
)
.to_string(),
)
})?;
greyscaled marked this conversation as resolved.
Show resolved Hide resolved
let store = storage.store.clone(); // Cheap to clone

let file_name: String = args.into_iter().next().unwrap().try_into()?;
let ext = Path::new(&file_name)
.extension()
.ok_or_else(|| {
ExtensionError::String(
"missing file extension, supported: [.csv, .json, .parquet]".to_string(),
)
})?
.to_str()
.ok_or_else(|| {
ExtensionError::String(format!("unsupported file extension: {file_name}"))
})?
.to_lowercase();
let file_format: Arc<dyn FileFormat> = match ext.as_str() {
"csv" => Arc::new(CsvFormat::default().with_schema_infer_max_rec(Some(20480))),
"json" => Arc::new(JsonFormat::default()),
"parquet" => Arc::new(ParquetFormat::default()),
ext => {
return Err(ExtensionError::String(format!(
"unsupported file extension: {ext}"
)))
}
};

// This is required to read the object meta, but we unfortunately also
// need the base_url below for ObjStoreTableProvider impl. Maybe there's
// a refactor opportunity.
let prefix: ObjectStorePath =
format!("databases/{}/uploads/{}", storage.db_id(), file_name).into();
let base_url = format!("{}", storage.root_url);
let base_url = ObjectStoreUrl::parse(base_url)?;

// glaredb_upload currently does not support globbing, and therefore we
// do not need to iterate: there should only be one.
let mut objects = store.list(Some(&prefix));
let meta = match objects
.next()
.await
.ok_or_else(|| ExtensionError::String(format!("file not found: {}", file_name)))?
{
Ok(meta) => meta,
Err(e) => return Err(ExtensionError::ObjectStore(e.to_string())),
};
let objects: Vec<ObjectMeta> = vec![meta];

// Infer schema
greyscaled marked this conversation as resolved.
Show resolved Hide resolved
let arrow_schema = file_format
.infer_schema(&session_state, &store.inner, &objects)
.await?;

return Ok(Arc::new(ObjStoreTableProvider::new(
store.inner.clone(),
arrow_schema,
base_url,
objects,
file_format,
)));
}
}
Loading