Skip to content

Commit

Permalink
Migrate to new backend API
Browse files Browse the repository at this point in the history
  • Loading branch information
kszucs committed Nov 15, 2021
1 parent f05e726 commit 7ca568b
Show file tree
Hide file tree
Showing 4 changed files with 195 additions and 227 deletions.
119 changes: 112 additions & 7 deletions ibis/backends/datafusion/__init__.py
Original file line number Diff line number Diff line change
@@ -1,17 +1,25 @@
# from pkg_resources import parse_version

import re
from typing import Mapping

import datafusion as df
import pyarrow as pa

import ibis.common.exceptions as com
import ibis.expr.schema as sch
import ibis.expr.types as ir
from ibis.backends.base import BaseBackend

from .client import DataFusionClient, DataFusionDatabase, DataFusionTable
from .compiler import DataFusionExprTranslator

# TODO(kszucs): support nested and parametric types
# consolidate with the logic from the parquet backend


class Backend(BaseBackend):
name = 'datafusion'
builder = None
client_class = DataFusionClient
database_class = DataFusionDatabase
table_class = DataFusionTable
translator_class = DataFusionExprTranslator

def version(self):
Expand All @@ -30,13 +38,110 @@ def connect(self, config):
-------
DataFusionClient
"""
return self.client_class(backend=self, config=config)
new_backend = self.__class__()
if isinstance(config, df.ExecutionContext):
new_backend.context = config
else:
new_backend.context = df.ExecutionContext()

for name, path in config.items():
strpath = str(path)
if strpath.endswith('.csv'):
new_backend.register_csv(name, path)
elif strpath.endswith('.parquet'):
new_backend.register_parquet(name, path)
else:
raise ValueError('Wrong format')

return new_backend

def current_database(self):
raise NotImplementedError("")

def list_databases(self):
raise NotImplementedError("")

def list_tables(self):
raise NotImplementedError("")
def list_tables(self, like=None):
"""List the available tables."""
tables = list(self.context.tables())
if like is not None:
pattern = re.compile(like)
return list(filter(lambda t: pattern.findall(t), tables))
return tables

# def database(self, name='public'):
# '''Construct a database called `name`.'''
# catalog = self.context.catalog()
# database = catalog.database(name)
# return self.database_class(name, self)

def table(self, name, schema=None):
catalog = self.context.catalog()
database = catalog.database('public')
table = database.table(name)
schema = sch.infer(table.schema)
return self.table_class(name, schema, self).to_expr()

def register_csv(self, name, path, schema=None):
self.context.register_csv(name, path, schema=schema)

def register_parquet(self, name, path, schema=None):
self.context.register_parquet(name, path, schema=schema)

def execute(
self,
expr: ir.Expr,
params: Mapping[ir.Expr, object] = None,
limit: str = 'default',
**kwargs,
):
def _collect(frame):
batches = frame.collect()
if batches:
table = pa.Table.from_batches(batches)
else:
# TODO(kszucs): file a bug to datafusion because the fields'
# nullability from frame.schema() is not always consistent
# with the first record batch's schema
table = pa.Table.from_batches(batches, schema=frame.schema())
return table.to_pandas()

if isinstance(expr, ir.TableExpr):
frame = self.compile(expr, params, **kwargs)
return _collect(frame)
elif isinstance(expr, ir.ColumnExpr):
# expression must be named for the projection
expr = expr.name('tmp').to_projection()
frame = self.compile(expr, params, **kwargs)
return _collect(frame)['tmp']
elif isinstance(expr, ir.ScalarExpr):
if expr.op().root_tables():
# there are associated datafusion tables so convert the expr
# to a selection which we can directly convert to a datafusion
# plan
expr = expr.name('tmp').to_projection()
frame = self.compile(expr, params, **kwargs)
else:
# doesn't have any tables associated so create a plan from a
# dummy datafusion table
compiled = self.compile(expr, params, **kwargs)
frame = self.context.empty_table().select(compiled)
return _collect(frame).iloc[0, 0]
else:
raise com.IbisError(
f"Cannot execute expression of type: {type(expr)}"
)

def compile(
self, expr: ir.Expr, params: Mapping[ir.Expr, object] = None, **kwargs
):
"""Compile `expr`.
Notes
-----
For the dask backend returns a dask graph that you can run ``.compute``
on to get a pandas object.
"""
translator = self.translator_class()
return translator.translate(expr)
214 changes: 0 additions & 214 deletions ibis/backends/datafusion/client.py

This file was deleted.

Loading

0 comments on commit 7ca568b

Please sign in to comment.