Skip to content

Commit

Permalink
chore: integrate new rust sdk into python bindings (#2850)
Browse files Browse the repository at this point in the history
  • Loading branch information
tychoish authored Apr 18, 2024
1 parent 50b3428 commit 5107dd0
Show file tree
Hide file tree
Showing 12 changed files with 338 additions and 442 deletions.
1 change: 1 addition & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions bindings/python/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ pgrepr = { path = "../../crates/pgrepr" }
datafusion_ext = { path = "../../crates/datafusion_ext" }
arrow_util = { path = "../../crates/arrow_util" }
terminal_util = { path = "../../crates/terminal_util" }
glaredb = { path = "../../crates/glaredb" }
datafusion = { workspace = true, features = ["pyarrow"] }
tokio = { workspace = true }
thiserror = { workspace = true }
Expand Down
91 changes: 16 additions & 75 deletions bindings/python/src/connect.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,50 +4,16 @@
//! queries.
use std::collections::HashMap;
use std::path::PathBuf;
use std::sync::Arc;

use futures::lock::Mutex;
use pyo3::prelude::*;
use sqlexec::engine::{Engine, EngineStorage};
use sqlexec::remote::client::RemoteClientType;
use url::Url;

use crate::connection::Connection;
use crate::environment::PyEnvironmentReader;
use crate::error::PyGlareDbError;
use crate::runtime::wait_for_future;

#[derive(Debug, Clone)]
struct PythonSessionConf {
/// Where to store both metastore and user data.
data_dir: Option<PathBuf>,
/// URL for cloud deployment to connect to.
cloud_url: Option<Url>,
}

impl From<Option<String>> for PythonSessionConf {
fn from(value: Option<String>) -> Self {
match value {
Some(s) => match Url::parse(&s) {
Ok(u) => PythonSessionConf {
data_dir: None,
cloud_url: Some(u),
},
// Assume failing to parse a url just means the user provided a local path.
Err(_) => PythonSessionConf {
data_dir: Some(PathBuf::from(s)),
cloud_url: None,
},
},
None => PythonSessionConf {
data_dir: None,
cloud_url: None,
},
}
}
}

/// Connect to a GlareDB database.
///
/// # Examples
Expand Down Expand Up @@ -88,48 +54,23 @@ pub fn connect(
storage_options: Option<HashMap<String, String>>,
) -> PyResult<Connection> {
wait_for_future(py, async move {
let conf = PythonSessionConf::from(data_dir_or_cloud_url);

let storage = if let Some(location) = location.clone() {
EngineStorage::Remote {
location,
options: storage_options.unwrap_or_default(),
}
} else if let Some(data_dir) = conf.data_dir.clone() {
EngineStorage::Local(data_dir)
} else {
EngineStorage::Memory
};

let mut engine = Engine::from_storage(storage)
.await
.map_err(PyGlareDbError::from)?;

engine = engine
.with_spill_path(spill_path.map(|p| p.into()))
.map_err(PyGlareDbError::from)?;

let mut session = engine
.default_local_session_context()
.await
.map_err(PyGlareDbError::from)?;

session
.create_client_session(
conf.cloud_url.clone(),
cloud_addr,
disable_tls,
RemoteClientType::Python,
None,
)
.await
.map_err(PyGlareDbError::from)?;

session.register_env_reader(Some(Arc::new(PyEnvironmentReader)));

Ok(Connection {
session: Arc::new(Mutex::new(session)),
_engine: Arc::new(engine),
inner: Arc::new(
glaredb::ConnectOptionsBuilder::default()
.connection_target(data_dir_or_cloud_url.clone())
.set_storage_options(storage_options)
.location(location)
.spill_path(spill_path)
.cloud_addr(cloud_addr)
.disable_tls(disable_tls)
.client_type(RemoteClientType::Python)
.environment_reader(Arc::new(PyEnvironmentReader))
.build()
.map_err(PyGlareDbError::from)?
.connect()
.await
.map_err(PyGlareDbError::from)?,
),
})
})
}
120 changes: 42 additions & 78 deletions bindings/python/src/connection.rs
Original file line number Diff line number Diff line change
@@ -1,28 +1,20 @@
use std::sync::Arc;

use datafusion::logical_expr::LogicalPlan as DFLogicalPlan;
use datafusion_ext::vars::SessionVars;
use futures::lock::Mutex;
use once_cell::sync::OnceCell;
use pyo3::prelude::*;
use pyo3::types::PyType;
use sqlexec::engine::{Engine, EngineStorage, SessionStorageConfig, TrackedSession};
use sqlexec::{LogicalPlan, OperationInfo};

use crate::execution_result::PyExecutionResult;

pub(super) type PyTrackedSession = Arc<Mutex<TrackedSession>>;
use sqlexec::remote::client::RemoteClientType;

use crate::environment::PyEnvironmentReader;
use crate::error::PyGlareDbError;
use crate::logical_plan::PyLogicalPlan;
use crate::execution::PyExecutionOutput;
use crate::runtime::wait_for_future;

/// A connected session to a GlareDB database.
#[pyclass]
#[derive(Clone)]
pub struct Connection {
pub(super) session: PyTrackedSession,
pub(super) _engine: Arc<Engine>,
pub(crate) inner: Arc<glaredb::Connection>,
}

impl Connection {
Expand All @@ -36,16 +28,15 @@ impl Connection {

let con = DEFAULT_CON.get_or_try_init(|| {
wait_for_future(py, async move {
let engine = Engine::from_storage(EngineStorage::Memory).await?;
let sess = engine
.new_local_session_context(
SessionVars::default(),
SessionStorageConfig::default(),
)
.await?;
Ok(Connection {
session: Arc::new(Mutex::new(sess)),
_engine: Arc::new(engine),
inner: Arc::new(
glaredb::ConnectOptionsBuilder::new_in_memory()
.client_type(RemoteClientType::Python)
.environment_reader(Arc::new(PyEnvironmentReader))
.build()?
.connect()
.await?,
),
}) as Result<_, PyGlareDbError>
})
})?;
Expand All @@ -56,12 +47,12 @@ impl Connection {

#[pymethods]
impl Connection {
fn __enter__(&mut self, _py: Python<'_>) -> PyResult<Self> {
fn __enter__(&self, _py: Python<'_>) -> PyResult<Self> {
Ok(self.clone())
}

fn __exit__(
&mut self,
&self,
py: Python<'_>,
_exc_type: Option<&PyType>,
_exc_value: Option<PyObject>,
Expand Down Expand Up @@ -110,39 +101,15 @@ impl Connection {
/// con = glaredb.connect()
/// con.sql('create table my_table (a int)').execute()
/// ```
pub fn sql(&mut self, py: Python<'_>, query: &str) -> PyResult<PyLogicalPlan> {
let cloned_sess = self.session.clone();
pub fn sql(&self, py: Python<'_>, query: &str) -> PyResult<PyExecutionOutput> {
wait_for_future(py, async move {
let mut sess = self.session.lock().await;

let plan = sess
.create_logical_plan(query)
Ok(self
.inner
.sql(query)
.evaluate()
.await
.map_err(PyGlareDbError::from)?;

let op = OperationInfo::new().with_query_text(query);

match plan
.to_owned()
.try_into_datafusion_plan()
.expect("resolving logical plan")
{
DFLogicalPlan::Extension(_)
| DFLogicalPlan::Dml(_)
| DFLogicalPlan::Ddl(_)
| DFLogicalPlan::Copy(_) => {
sess.execute_logical_plan(plan, &op)
.await
.map_err(PyGlareDbError::from)?;

Ok(PyLogicalPlan::new(
LogicalPlan::Noop,
cloned_sess,
Default::default(),
))
}
_ => Ok(PyLogicalPlan::new(plan, cloned_sess, op)),
}
.map_err(PyGlareDbError::from)?
.into())
})
}

Expand All @@ -159,14 +126,15 @@ impl Connection {
///
/// All operations execute lazily when their results are
/// processed.
pub fn prql(&mut self, py: Python<'_>, query: &str) -> PyResult<PyLogicalPlan> {
let cloned_sess = self.session.clone();
pub fn prql(&self, py: Python<'_>, query: &str) -> PyResult<PyExecutionOutput> {
wait_for_future(py, async move {
let mut sess = self.session.lock().await;
let plan = sess.prql_to_lp(query).await.map_err(PyGlareDbError::from)?;
let op = OperationInfo::new().with_query_text(query);

Ok(PyLogicalPlan::new(plan, cloned_sess, op))
Ok(self
.inner
.prql(query)
.evaluate()
.await
.map_err(PyGlareDbError::from)?
.into())
})
}

Expand All @@ -182,28 +150,24 @@ impl Connection {
/// con = glaredb.connect()
/// con.execute('create table my_table (a int)')
/// ```
pub fn execute(&mut self, py: Python<'_>, query: &str) -> PyResult<PyExecutionResult> {
let sess = self.session.clone();
let (_, exec_result) = wait_for_future(py, async move {
let mut sess = sess.lock().await;
let plan = sess
.create_logical_plan(query)
.await
.map_err(PyGlareDbError::from)?;

let op = OperationInfo::new().with_query_text(query);

sess.execute_logical_plan(plan, &op)
pub fn execute(&self, py: Python<'_>, query: &str) -> PyResult<PyExecutionOutput> {
wait_for_future(py, async move {
Ok(self
.inner
.execute(query)
.evaluate()
.await
.map_err(PyGlareDbError::from)
})?;

Ok(PyExecutionResult(exec_result))
.map_err(PyGlareDbError::from)?
.into())
})
}

/// Close the current session.
pub fn close(&mut self, _py: Python<'_>) -> PyResult<()> {
pub fn close(&self, _py: Python<'_>) -> PyResult<()> {
// TODO: Remove this method. No longer required.
//
// could we use this method to clear the environment/in memory
// database?
Ok(())
}
}
8 changes: 4 additions & 4 deletions bindings/python/src/environment.rs
Original file line number Diff line number Diff line change
@@ -1,13 +1,13 @@
use std::sync::Arc;

use datafusion::arrow::array::RecordBatch;
use datafusion::arrow::pyarrow::PyArrowType;
use datafusion::arrow::record_batch::RecordBatch;
use datafusion::datasource::{MemTable, TableProvider};
use pyo3::prelude::*;
use pyo3::types::{IntoPyDict, PyTuple, PyType};
use sqlexec::environment::EnvironmentReader;

use crate::logical_plan::PyLogicalPlan;
use crate::execution::PyExecutionOutput;

/// Read polars dataframes from the python environment.
#[derive(Debug, Clone, Copy)]
Expand Down Expand Up @@ -166,6 +166,6 @@ fn resolve_pandas(py: Python, var: &PyAny) -> PyResult<Option<Arc<dyn TableProvi
}

fn resolve_logical_plan(_py: Python, var: &PyAny) -> PyResult<Option<Arc<dyn TableProvider>>> {
let lp: PyLogicalPlan = var.extract()?;
Ok(Some(Arc::new(lp) as Arc<dyn TableProvider>))
let exec: PyExecutionOutput = var.extract()?;
Ok(Some(Arc::new(exec) as Arc<dyn TableProvider>))
}
20 changes: 18 additions & 2 deletions bindings/python/src/error.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
use std::fmt::Display;

use datafusion::arrow::error::ArrowError;
use glaredb::DataFusionError;
use metastore::errors::MetastoreError;
use pyo3::exceptions::{PyException, PyRuntimeError};
use pyo3::{create_exception, PyErr};
Expand All @@ -10,12 +11,20 @@ use sqlexec::errors::ExecError;
pub enum PyGlareDbError {
#[error(transparent)]
Arrow(#[from] ArrowError),
#[error(transparent)]
Anyhow(#[from] anyhow::Error),
#[error(transparent)]
DataFusion(#[from] DataFusionError),
#[error(transparent)]
PyErr(#[from] PyErr),

#[error(transparent)]
Metastore(#[from] MetastoreError),
#[error(transparent)]
Exec(#[from] ExecError),
#[error(transparent)]
Anyhow(#[from] anyhow::Error),
ConfigurationBuilder(#[from] glaredb::ConnectOptionsBuilderError),

#[error("{0}")]
Other(String),
}
Expand All @@ -34,10 +43,17 @@ impl From<PyGlareDbError> for PyErr {
PyGlareDbError::Exec(err) => ExecutionException::new_err(err.to_string()),
PyGlareDbError::Anyhow(err) => PyRuntimeError::new_err(format!("{err:?}")),
PyGlareDbError::Other(msg) => PyRuntimeError::new_err(msg),
PyGlareDbError::DataFusion(err) => DataFusionErrorException::new_err(err.to_string()),
PyGlareDbError::PyErr(err) => err,
PyGlareDbError::ConfigurationBuilder(err) => {
ConfigurationException::new_err(err.to_string())
}
}
}
}

create_exception!(exceptions, ArrowErrorException, PyException);
create_exception!(exceptions, MetastoreException, PyException);
create_exception!(exceptions, ConfigurationException, PyException);
create_exception!(exceptions, DataFusionErrorException, PyException);
create_exception!(exceptions, ExecutionException, PyException);
create_exception!(exceptions, MetastoreException, PyException);
Loading

0 comments on commit 5107dd0

Please sign in to comment.