Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

add druid client #39

Merged
merged 1 commit into from
Feb 6, 2018
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions omniduct/_version.py
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,10 @@
'sqlalchemy' # Primary client
],

'druid': [
'pydruid>=0.4.0', # Primary client
],

# Filesystems
'webhdfs': [
'pywebhdfs', # Primary client
Expand Down
64 changes: 64 additions & 0 deletions omniduct/databases/druid.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,64 @@
from __future__ import absolute_import

from omniduct.utils.debug import logger

from .base import DatabaseClient


class DruidClient(DatabaseClient):

PROTOCOLS = ['druid']
DEFAULT_PORT = 80

def _init(self):
self.__druid = None

# Connection
def _connect(self):
from pydruid.db import connect
logger.info('Connecting to Druid database ...')
self.__druid = connect(self.host, self.port, path='/druid/v2/sql/', scheme='http')

def _is_connected(self):
return self.__druid is not None

def _disconnect(self):
logger.info('Disconnecting from Druid database ...')
try:
self.__druid.close()
except Exception:
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do you know what the Druid client raises if it is not connected?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It's sometimes buggy so I'm going to leave this as is for now
druid-io/pydruid#119

pass
self.__druid = None

# Querying
def _execute(self, statement, cursor=None, async=False):
cursor = cursor or self.__druid.cursor()
cursor.execute(statement)
return cursor

def _table_list(self, schema=None, like=None, **kwargs):
cmd = "SELECT * FROM INFORMATION_SCHEMA.TABLES"
return self.query(cmd, **kwargs)

def _table_exists(self, table, schema=None):
return (self.table_list(renew=True, schema=schema)['TABLE_NAME'] == table).any()

def _table_desc(self, table, **kwargs):
query = ("""
SELECT
TABLE_SCHEMA
, TABLE_NAME
, COLUMN_NAME
, ORDINAL_POSITION
, COLUMN_DEFAULT
, IS_NULLABLE
, DATA_TYPE
FROM INFORMATION_SCHEMA.COLUMNS
WHERE TABLE_NAME = '{}'""").format(table)
return self.query(query, **kwargs)

def _table_head(self, table, n=10, **kwargs):
return self.query("SELECT * FROM {} LIMIT {}".format(table, n), **kwargs)

def _table_props(self, table, **kwargs):
raise NotImplementedError
1 change: 1 addition & 0 deletions omniduct/protocols.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
from .databases.presto import PrestoClient
from .databases.sqlalchemy import SQLAlchemyClient
from .databases.neo4j import Neo4jClient
from .databases.druid import DruidClient
from .filesystems.local import LocalFsClient
from .filesystems.s3 import S3Client
from .filesystems.webhdfs import WebHdfsClient
Expand Down