From 9e5e01bdca6c2c831649a3b887eff869df0a7063 Mon Sep 17 00:00:00 2001 From: Daniel Frank Date: Tue, 6 Feb 2018 14:40:35 -0800 Subject: [PATCH] add druid client (#39) --- omniduct/_version.py | 4 +++ omniduct/databases/druid.py | 64 +++++++++++++++++++++++++++++++++++++ omniduct/protocols.py | 1 + 3 files changed, 69 insertions(+) create mode 100644 omniduct/databases/druid.py diff --git a/omniduct/_version.py b/omniduct/_version.py index fe27bee..600960a 100644 --- a/omniduct/_version.py +++ b/omniduct/_version.py @@ -47,6 +47,10 @@ 'sqlalchemy' # Primary client ], + 'druid': [ + 'pydruid>=0.4.0', # Primary client + ], + # Filesystems 'webhdfs': [ 'pywebhdfs', # Primary client diff --git a/omniduct/databases/druid.py b/omniduct/databases/druid.py new file mode 100644 index 0000000..79e66dd --- /dev/null +++ b/omniduct/databases/druid.py @@ -0,0 +1,64 @@ +from __future__ import absolute_import + +from omniduct.utils.debug import logger + +from .base import DatabaseClient + + +class DruidClient(DatabaseClient): + + PROTOCOLS = ['druid'] + DEFAULT_PORT = 80 + + def _init(self): + self.__druid = None + + # Connection + def _connect(self): + from pydruid.db import connect + logger.info('Connecting to Druid database ...') + self.__druid = connect(self.host, self.port, path='/druid/v2/sql/', scheme='http') + + def _is_connected(self): + return self.__druid is not None + + def _disconnect(self): + logger.info('Disconnecting from Druid database ...') + try: + self.__druid.close() + except Exception: + pass + self.__druid = None + + # Querying + def _execute(self, statement, cursor=None, async=False): + cursor = cursor or self.__druid.cursor() + cursor.execute(statement) + return cursor + + def _table_list(self, schema=None, like=None, **kwargs): + cmd = "SELECT * FROM INFORMATION_SCHEMA.TABLES" + return self.query(cmd, **kwargs) + + def _table_exists(self, table, schema=None): + return (self.table_list(renew=True, schema=schema)['TABLE_NAME'] == table).any() + + def _table_desc(self, table, **kwargs): + query = (""" + SELECT + TABLE_SCHEMA + , TABLE_NAME + , COLUMN_NAME + , ORDINAL_POSITION + , COLUMN_DEFAULT + , IS_NULLABLE + , DATA_TYPE + FROM INFORMATION_SCHEMA.COLUMNS + WHERE TABLE_NAME = '{}'""").format(table) + return self.query(query, **kwargs) + + def _table_head(self, table, n=10, **kwargs): + return self.query("SELECT * FROM {} LIMIT {}".format(table, n), **kwargs) + + def _table_props(self, table, **kwargs): + raise NotImplementedError diff --git a/omniduct/protocols.py b/omniduct/protocols.py index b121e81..8dd151f 100644 --- a/omniduct/protocols.py +++ b/omniduct/protocols.py @@ -3,6 +3,7 @@ from .databases.presto import PrestoClient from .databases.sqlalchemy import SQLAlchemyClient from .databases.neo4j import Neo4jClient +from .databases.druid import DruidClient from .filesystems.local import LocalFsClient from .filesystems.s3 import S3Client from .filesystems.webhdfs import WebHdfsClient