Skip to content

Commit

Permalink
Migrate plugins
Browse files Browse the repository at this point in the history
  • Loading branch information
blythed committed Jul 29, 2024
1 parent f0964b0 commit 3e5948c
Show file tree
Hide file tree
Showing 9 changed files with 202 additions and 132 deletions.
1 change: 0 additions & 1 deletion .github/workflows/ci_code.yml
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,6 @@ on:
- "plugins/**"
- ".github/**"
- 'docs/**'
- 'examples/**'
- '*.md'
- '*.rst'
workflow_dispatch:
Expand Down
62 changes: 62 additions & 0 deletions .github/workflows/ci_code_mongodb.yml
Original file line number Diff line number Diff line change
@@ -0,0 +1,62 @@
name: Code Testing mongodb

on:
pull_request:
branches:
- main
paths: # Paths that may affect code quality
- "plugins/mongodb/**"

jobs:
# ---------------------------------
# Unit Testing
# ---------------------------------
unit_testing_mongodb:
runs-on: ${{ matrix.os }}
strategy:
fail-fast: false
matrix:
os: [ "ubuntu-latest" ]
python-version: ["3.11"]
steps:
- name: Checkout repository
uses: actions/checkout@v4

#---------------------------------------------------
# Configuring Python environments.
#
# We cache both the pip packages and the installation dir.
# If the pyproject remains unchanged, we re-use the existing installation dir.
# If the pyproject has changed, we reinstall everything using the cached pip packages.
- name: Cache Pip Packages
id: setup-python
uses: actions/setup-python@v4
with:
python-version: ${{ matrix.python-version }}
cache: 'pip' # caching pip dependencies

- name: Check for test directory
id: check_test_dir
run: |
if [ ! -d "plugins/mongodb/test" ]; then
echo "Test directory does not exist. Cancelling workflow."
exit 1
fi
- name: Cache Python Installation
uses: actions/cache@v4
with:
path: ${{ env.pythonLocation }} # Cache the whole python installation dir.
key: ${{ matrix.os }}_python-${{ matrix.python-version }}_${{ hashFiles('pyproject.toml', '*/pyproject.toml') }}
#---------------------------------------------------

- name: Install SuperDuperDB Project
run: |
# Install core and testsuite dependencies on the cached python environment.
python -m pip install .
cd plugins/mongodb && python -m pip install .
- name: Run tests
run: |
python -m pip install pytest
cd plugins/mongodb && pytest test
2 changes: 2 additions & 0 deletions plugins/mongodb/pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,8 @@ keywords = [
requires-python = ">=3.10"
dynamic = ["version"]
dependencies = [
"mongomock",
"pymongo"
]

[project.urls]
Expand Down
File renamed without changes.
File renamed without changes.
62 changes: 62 additions & 0 deletions plugins/template/workflow.yml
Original file line number Diff line number Diff line change
@@ -0,0 +1,62 @@
name: Code Testing #PLUGIN

on:
pull_request:
branches:
- main
paths: # Paths that may affect code quality
- "plugins/#PLUGIN/**"

jobs:
# ---------------------------------
# Unit Testing
# ---------------------------------
unit_testing_#PLUGIN:
runs-on: ${{ matrix.os }}
strategy:
fail-fast: false
matrix:
os: [ "ubuntu-latest" ]
python-version: ["3.11"]
steps:
- name: Checkout repository
uses: actions/checkout@v4

#---------------------------------------------------
# Configuring Python environments.
#
# We cache both the pip packages and the installation dir.
# If the pyproject remains unchanged, we re-use the existing installation dir.
# If the pyproject has changed, we reinstall everything using the cached pip packages.
- name: Cache Pip Packages
id: setup-python
uses: actions/setup-python@v4
with:
python-version: ${{ matrix.python-version }}
cache: 'pip' # caching pip dependencies

- name: Check for test directory
id: check_test_dir
run: |
if [ ! -d "plugins/#PLUGIN/test" ]; then
echo "Test directory does not exist. Cancelling workflow."
exit 1
fi
- name: Cache Python Installation
uses: actions/cache@v4
with:
path: ${{ env.pythonLocation }} # Cache the whole python installation dir.
key: ${{ matrix.os }}_python-${{ matrix.python-version }}_${{ hashFiles('pyproject.toml', '*/pyproject.toml') }}
#---------------------------------------------------

- name: Install SuperDuperDB Project
run: |
# Install core and testsuite dependencies on the cached python environment.
python -m pip install .
cd plugins/#PLUGIN && python -m pip install .
- name: Run tests
run: |
python -m pip install pytest
cd plugins/#PLUGIN && pytest test
188 changes: 62 additions & 126 deletions superduper/base/build.py
Original file line number Diff line number Diff line change
@@ -1,99 +1,23 @@
import importlib
import re
import typing as t

from prettytable import PrettyTable

import superduper as s
from superduper import logging
from superduper.backends.base.backends import data_backends, metadata_stores
from superduper.backends.base.data_backend import BaseDataBackend, DataBackendProxy
from superduper.backends.base.data_backend import DataBackendProxy
from superduper.backends.base.metadata import MetaDataStoreProxy
from superduper.backends.local.artifacts import FileSystemArtifactStore
from superduper.backends.mongodb.artifacts import MongoArtifactStore
from superduper.base.datalayer import Datalayer
from superduper.misc.anonymize import anonymize_url
from superduper.misc.plugins import load_plugin


def _get_metadata_store(cfg):
# try to connect to the metadata store specified in the configuration.
logging.info("Connecting to Metadata Client:", cfg.metadata_store)
return _build_databackend_impl(cfg.metadata_store, metadata_stores, type='metadata')


def _build_metadata(cfg, databackend: t.Optional['BaseDataBackend'] = None):
# Connect to metadata store.
# ------------------------------
# 1. try to connect to the metadata store specified in the configuration.
# 2. if that fails, try to connect to the data backend engine.
# 3. if that fails, try to connect to the data backend uri.
if cfg.metadata_store is not None:
return _get_metadata_store(cfg)
else:
try:
# try to connect to the data backend engine.
assert isinstance(databackend, DataBackendProxy)
logging.info(
"Connecting to Metadata Client with engine: ", databackend.conn
)
return databackend.build_metadata()
except Exception as e:
logging.warn("Error building metadata from DataBackend:", str(e))
metadata = None

if metadata is None:
# try to connect to the data backend uri.
logging.info("Connecting to Metadata Client with URI: ", cfg.data_backend)
return _build_databackend_impl(
cfg.data_backend, metadata_stores, type='metadata'
)


def _build_databackend(cfg, databackend=None):
# Connect to data backend.
# ------------------------------
if not databackend:
databackend = _build_databackend_impl(cfg.data_backend, data_backends)
logging.info("Data Client is ready.", databackend.conn)
return databackend


def _build_artifact_store(
artifact_store: t.Optional[str] = None,
databackend: t.Optional['BaseDataBackend'] = None,
):
if not artifact_store:
assert isinstance(databackend, DataBackendProxy)
return databackend.build_artifact_store()

if artifact_store.startswith('mongodb://'):
import pymongo

conn: pymongo.MongoClient = pymongo.MongoClient(
'/'.join(artifact_store.split('/')[:-1])
)
name = artifact_store.split('/')[-1]
return MongoArtifactStore(conn, name)
elif artifact_store.startswith('filesystem://'):
directory = artifact_store.split('://')[1]
return FileSystemArtifactStore(directory)
else:
raise ValueError(f'Unknown artifact store: {artifact_store}')


class _MetaDataMatcher:
patterns = {
r'^mongodb:\/\/': ('mongodb', 'mongodb'),
r'^mongodb\+srv:\/\/': ('mongodb', 'atlas'),
r'^mongomock:\/\/': ('mongodb', 'mongomock'),
}
not_supported = [('sqlalchemy', 'pandas')]

class _Loader:
@classmethod
def create(cls, uri, mapping: t.Dict):
def create(cls, uri):
"""Helper method to create metadata backend."""
backend = 'sqlalchemy'
flavour = 'base'
backend = None
flavour = None
for pattern in cls.patterns:
if re.match(pattern, uri) is not None:
backend, flavour = cls.patterns[pattern]
Expand All @@ -102,62 +26,77 @@ def create(cls, uri, mapping: t.Dict):
f"{backend} with flavour {flavour} not supported "
"to create metadata store."
)
return mapping[backend](uri, flavour=flavour)
impl = getattr(load_plugin(f'superduper_{backend}'), cls.impl)
return impl(uri, flavour=flavour)
raise ValueError(f"No support for uri: {uri}")

return mapping[backend](uri)

class _MetaDataLoader(_Loader):
impl = 'MetadataStore'
patterns = {
r'^mongodb:\/\/': ('mongodb', 'mongodb'),
r'^mongodb\+srv:\/\/': ('mongodb', 'atlas'),
r'^mongomock:\/\/': ('mongodb', 'mongomock'),
r'^sqlite:\/\/': ('sqlalchemy', 'base'),
r'^postgres:\/\/': ('sqlalchemy', 'base'),
r'^snowflake:\/\/': ('sqlalchemy', 'base'),
r'^duckdb:\/\/': ('sqlalchemy', 'base'),
r'^mssql:\/\/': ('sqlalchemy', 'base'),
r'^mysql:\/\/': ('sqlalchemy', 'base'),
}

class _DataBackendMatcher(_MetaDataMatcher):
patterns = {**_MetaDataMatcher.patterns, r'.*\.csv$': ('ibis', 'pandas')}

@classmethod
def create(cls, uri, mapping: t.Dict):
"""Helper method to create databackend."""
backend = 'ibis'
for pattern in cls.patterns:
if re.match(pattern, uri) is not None:
backend, flavour = cls.patterns[pattern]
class _DataBackendLoader(_Loader):
impl = 'DataBackend'
patterns = {
r'^mongodb:\/\/': ('mongodb', 'mongodb'),
r'^mongodb\+srv:\/\/': ('mongodb', 'atlas'),
r'^mongomock:\/\/': ('mongodb', 'mongomock'),
r'^sqlite://': ('ibis', 'base'),
r'^postgres://': ('ibis', 'base'),
r'^duckdb://': ('ibis', 'base'),
r'^mssql://': ('ibis', 'base'),
r'^mysql://': ('ibis', 'base'),
r'.*\*.csv$': ('ibis', 'pandas'),
}

return mapping[backend](uri, flavour=flavour)

return mapping[backend](uri, flavour='base')
class _ArtifactStoreLoader(_Loader):
impl = 'ArtifactStore'
patterns = {
r'^filesystem:\/\/': ('local', 'base'),
r'^mongodb\+srv:\/\/': ('mongodb', 'atlas'),
r'^mongodb:\/\/': ('mongodb', 'base'),
}


# Helper function to build a data backend based on the URI.
def _build_databackend_impl(uri, mapping, type: str = 'data_backend'):
logging.debug(f"Parsing data connection URI:{uri}")
if type == 'data_backend':
db = DataBackendProxy(_DataBackendMatcher.create(uri, mapping))
else:
db = MetaDataStoreProxy(_MetaDataMatcher.create(uri, mapping))
def _build_artifact_store(uri, mapping):
return _ArtifactStoreLoader.create(uri, mapping)


def _build_databackend(uri, mapping):
return DataBackendProxy(_DataBackendLoader.create(uri, mapping))


def _build_metadata(uri, mapping):
db = MetaDataStoreProxy(_MetaDataLoader.create(uri, mapping))
return db


# TODO why public unlike others
def build_compute(cfg):
"""
Helper function to build compute backend.
:param cfg: SuperDuper config.
"""

def _local_component(path, **kwargs):
spath = path.split('.')
path, cls = '.'.join(spath[:-1]), spath[-1]
module = importlib.import_module(path)
component_cls = getattr(module, cls)
return component_cls(**kwargs)
plugin = load_plugin(cfg.cluster.compute.backend)
queue_publisher = plugin.QueuePublisher(cfg.cluster.queue.uri)
return plugin.ComputeBackend(cfg.cluster.compute.uri, queue=queue_publisher)

compute = cfg.cluster.compute
queue = cfg.cluster.queue
logging.info("Connecting to compute client:", compute)
path = compute._path or 'superduper.backends.local.compute.LocalComputeBackend'
queue_path = queue._path or 'superduper.jobs.queue.LocalSequentialQueue'
queue = _local_component(queue_path, uri=queue.uri)

return _local_component(path, uri=compute.uri, queue=queue)


def build_datalayer(cfg=None, databackend=None, **kwargs) -> Datalayer:
def build_datalayer(cfg=None, **kwargs) -> Datalayer:
"""
Build a Datalayer object as per ``db = superduper(db)`` from configuration.
Expand All @@ -170,17 +109,14 @@ def build_datalayer(cfg=None, databackend=None, **kwargs) -> Datalayer:
# ------------------------------
# Use the provided configuration or fall back to the default configuration.
cfg = (cfg or s.CFG)(**kwargs)

databackend = _build_databackend(cfg, databackend)
metadata = _build_metadata(cfg, databackend)
assert metadata

artifact_store = _build_artifact_store(cfg.artifact_store, databackend)
databackend_obj = _build_databackend(cfg.databackend)
metadata_obj = _build_metadata(cfg.metadata_store or cfg.databackend)
artifact_store = _build_artifact_store(cfg.artifact_store or cfg.databackend)
compute = build_compute(cfg)

datalayer = Datalayer(
databackend=databackend,
metadata=metadata,
databackend=databackend_obj,
metadata=metadata_obj,
artifact_store=artifact_store,
compute=compute,
)
Expand Down
Loading

0 comments on commit 3e5948c

Please sign in to comment.