diff --git a/README b/README deleted file mode 100644 index 7294b4b8..00000000 --- a/README +++ /dev/null @@ -1,24 +0,0 @@ -=========== -pyDruid -=========== - -pyDruid provides a python interface to the Druid analytic store. Typical usage -often looks like this:: - - #!/usr/bin/env python - - from pyDruid import * - - # Druid Config - endpoint = 'druid/v2/?pretty' - demo_bard_url = 'http://localhost:8083' - dataSource = 'wikipedia' - intervals = ["2013-01-01/p1y"] - - query = pyDruid(demo_bard_url, endpoint) - - counts = query.timeseries(dataSource = dataSource, - granularity = "minute", - intervals = intervals, - aggregations = {"count" : doubleSum("edits")} - ) diff --git a/build/lib/pydruid/__init__.py b/build/lib/pydruid/__init__.py deleted file mode 100644 index e69de29b..00000000 diff --git a/build/lib/pydruid/client.py b/build/lib/pydruid/client.py deleted file mode 100644 index 3a2bc840..00000000 --- a/build/lib/pydruid/client.py +++ /dev/null @@ -1,222 +0,0 @@ -# -# Copyright 2013 Metamarkets Group Inc. -# -# Licensed under the Apache License, Version 2.0 (the "License"); -# you may not use this file except in compliance with the License. -# You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. -# -from __future__ import division -import urllib2 -import simplejson as json -import csv -import re -import os -import sys -import pandas -import dateutil.parser -from matplotlib import * -from matplotlib.pyplot import * -from utils.aggregators import * -from utils.postaggregator import * -from utils.filters import * -from utils.query_utils import * - - -class pyDruid: - - def __init__(self,url,endpoint): - self.url = url - self.endpoint = endpoint - self.result = None - self.result_json = None - self.query_type = None - - # serializes a dict representation of the query into a json - # object, and sends it to bard via post, and gets back the - # json representation of the query result - def post(self,query): - querystr = json.dumps(query) - url = self.url + '/' + self.endpoint - headers = {'Content-Type' : 'application/json'} - req = urllib2.Request(url, querystr, headers) - res = urllib2.urlopen(req) - data = res.read() - self.result_json = data; - self.querystr = querystr - res.close() - - # de-serializes the data returned from druid into a - # list of dicts - def parse(self): - if self.result_json: - res = json.loads(self.result_json) - return res - else: - print("Empty query") - return None - - def export_tsv(self,dest_path): - - f = open(dest_path,'wb') - tsv_file = csv.writer(f, delimiter = '\t') - - if(self.query_type == "timeseries"): - header = self.result[0]['result'].keys() - header.append('timestamp') - elif(self.query_type == "groupby"): - header = self.result[0]['event'].keys() - header.append('timestamp') - header.append('version') - - tsv_file.writerow(header) - - # use unicodewriter to encode results in unicode - w = query_utils.UnicodeWriter(f) - - if self.result: - if self.query_type == "timeseries": - for item in self.result: - timestamp = item['timestamp'] - result = item['result'] - - if type(result) is list: - for line in result: - w.writerow(line.values() + [timestamp]) - else: #timeseries - w.writerow(result.values() + [timestamp]) - elif self.query_type == "groupby": - for item in self.result: - timestamp = item['timestamp'] - version = item['version'] - w.writerow(item['event'].values() + [timestamp] + [version]) - - f.close() - - # Exports a JSON query object into a Pandas data-frame - def export_pandas(self): - if self.result: - if self.query_type == "timeseries": - nres = [v['result'].items() + [('timestamp',v['timestamp'])] for v in self.result] - nres = [dict(v) for v in nres] - else: - print('not implemented yet') - return None - - df = pandas.DataFrame(nres) - df['timestamp'] = df['timestamp'].map(lambda x: dateutil.parser.parse(x)) - df['t'] = dates.date2num(df['timestamp']) - return df - - # implements a timeseries query - def timeseries(self, **args): - - query_dict = {"queryType" : "timeseries"} - valid_parts = ['dataSource', 'granularity', 'filter', 'aggregations', 'postAggregations', 'intervals'] - - for key, val in args.iteritems(): - if key not in valid_parts: - raise ValueError('{0} is not a valid query component. The list of valid components is: \n {1}'.format(key, valid_parts)) - elif key == "aggregations" : - query_dict[key] = build_aggregators(val) - elif key == "postAggregations": - query_dict[key] = build_post_aggregators(val) - elif key == "filter": - query_dict[key] = val.filter['filter'] - else: - query_dict[key] = val - - self.query_dict = query_dict - - try: - self.post(query_dict) - except urllib2.HTTPError, e: - raise IOError('Malformed query: \n {0}'.format(json.dumps(self.query_dict ,indent = 4))) - else: - self.result = self.parse() - self.query_type = "timeseries" - return self.result - - # Implements a groupBy query - def groupBy(self, **args): - - query_dict = {"queryType" : "groupBy"} - valid_parts = ['dataSource', 'granularity', 'filter', 'aggregations', 'postAggregations', - 'intervals', 'dimensions'] - - for key, val in args.iteritems(): - if key not in valid_parts: - raise ValueError('{0} is not a valid query component. The list of valid components is: \n {1}'.format(key, valid_parts)) - elif key == "aggregations" : - query_dict[key] = build_aggregators(val) - elif key == "postAggregations": - query_dict[key] = build_post_aggregators(val) - elif key == "filter": - query_dict[key] = val.filter['filter'] - else: - query_dict[key] = val - - self.query_dict = query_dict - - try: - self.post(query_dict) - except urllib2.HTTPError, e: - raise IOError('Malformed query: \n {0}'.format(json.dumps(self.query_dict ,indent = 4))) - else: - self.result = self.parse() - self.query_type = "groupby" - return self.parse() - - # Implements a segmentMetadata query. This query type is for pulling the tsv-equivalent - # size and cardinality broken out by dimension in a specified data source. - def segmentMetadata(self, **args): - - query_dict = {"queryType" : "segmentMetadata"} - valid_parts = ['dataSource', 'intervals'] - - for key, val in args.iteritems(): - if key not in valid_parts: - raise ValueError('{0} is not a valid query component. The list of valid components is: \n {1}'.format(key, valid_parts)) - else: - query_dict[key] = val - - self.query_dict = query_dict - - try: - self.post(query_dict) - except urllib2.HTTPError, e: - raise IOError('Malformed query: \n {0}'.format(json.dumps(self.query_dict ,indent = 4))) - else: - self.result = self.parse() - return self.parse() - - # implements a time boundary query - def timeBoundary(self, **args): - - query_dict = {"queryType" : "timeBoundary"} - valid_parts = ['dataSource'] - - for key, val in args.iteritems(): - if key not in valid_parts: - raise ValueError('{0} is not a valid query component. The list of valid components is: \n {1}'.format(key, valid_parts)) - else: - query_dict[key] = val - - try: - self.post(query_dict) - except urllib2.HTTPError, e: - raise IOError('Malformed query: \n {0}'.format(json.dumps(self.query_dict ,indent = 4))) - else: - self.result = self.parse() - return self.parse() - - # prints internal variables of the object - def describe(self): - print("url: " + self.url) diff --git a/build/lib/pydruid/utils/__init__.py b/build/lib/pydruid/utils/__init__.py deleted file mode 100644 index e69de29b..00000000 diff --git a/build/lib/pydruid/utils/aggregators.py b/build/lib/pydruid/utils/aggregators.py deleted file mode 100644 index efd54f42..00000000 --- a/build/lib/pydruid/utils/aggregators.py +++ /dev/null @@ -1,37 +0,0 @@ -# -# Copyright 2013 Metamarkets Group Inc. -# -# Licensed under the Apache License, Version 2.0 (the "License"); -# you may not use this file except in compliance with the License. -# You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. -# - -#create a json list of user-defined aggregations - -# Define the support aggregation types - -def longSum(raw_metric): - return {"type" : "longSum", "fieldName" : raw_metric} - -def doubleSum(raw_metric): - return {"type" : "doubleSum", "fieldName" : raw_metric} - -def min(raw_metric): - return {"type" : "min", "fieldName" : raw_metric} - -def max(raw_metric): - return {"type" : "max", "fieldName" : raw_metric} - -def count(raw_metric): - return {"type" : "count", "fieldName" : raw_metric} - -def build_aggregators(agg_input): - return [dict([('name',k)] + v.items()) for (k,v) in agg_input.iteritems()] \ No newline at end of file diff --git a/build/lib/pydruid/utils/filters.py b/build/lib/pydruid/utils/filters.py deleted file mode 100644 index 562cf78d..00000000 --- a/build/lib/pydruid/utils/filters.py +++ /dev/null @@ -1,58 +0,0 @@ -# -# Copyright 2013 Metamarkets Group Inc. -# -# Licensed under the Apache License, Version 2.0 (the "License"); -# you may not use this file except in compliance with the License. -# You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. -# -class Filter: - - def __init__(self, **args): - - # constrct a selector - if 'type' not in args.keys(): - self.filter = {"filter": {"type": "selector", - "dimension" : args["dimension"], - "value" : args["value"]}} - # construct an and filter - elif args["type"] == "and": - self.filter = {"filter": {"type" : "and", - "fields" : args["fields"]}} - # construct an or filter - elif args["type"] == "or": - self.filter = {"filter": {"type" : "or", - "fields" : args["fields"]}} - # construct a not filter - elif args["type"] == "not": - self.filter = {"filter" : {"type" : "not", - "field" : args["field"]}} - else: - print("you've slain me. nevermind the teeth and the fingernails. the show must go on.") - - def show(self): - print(json.dumps(self.filter, indent = 4)) - - def __and__(self, x): - return Filter(type = "and", fields = [self.filter['filter'], x.filter['filter']]) - - def __or__(self,x): - return Filter(type = "or", fields = [self.filter['filter'], x.filter['filter']]) - - def __invert__(self): - return Filter(type = "not", field = self.filter['filter']) - -class Dimension: - - def __init__(self, dim): - self.dimension = dim - - def __eq__(self,other): - return Filter(dimension = self.dimension, value = other) \ No newline at end of file diff --git a/build/lib/pydruid/utils/postaggregator.py b/build/lib/pydruid/utils/postaggregator.py deleted file mode 100644 index 0d647850..00000000 --- a/build/lib/pydruid/utils/postaggregator.py +++ /dev/null @@ -1,56 +0,0 @@ -# -# Copyright 2013 Metamarkets Group Inc. -# -# Licensed under the Apache License, Version 2.0 (the "License"); -# you may not use this file except in compliance with the License. -# You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. -# -import pprint as pp -import re - -# determine whether arguments are strings or numeric constants -def is_numeric(field): - for obj in [int, float, long]: - try: - obj(field) - return True - except: - pass - - return False - -# specify field type as source field name (string) or constant and build json -def field_spec(field): - if is_numeric(field): - return {"type" : "constant", - "name" : "constant_%d" % int(field), - "value" : field} - else: - return {"type" : "fieldAccess", - "fieldName" : field} - -# use regex to extract fields and operand from user-submitted postAgg string -def parse_calc(postAggOpString): - fields = [] - rgx = "(.+)[ ]?([*+/-])[ ]?(.+)" - postAggOp = re.findall(rgx, postAggOpString).pop() - - if postAggOp: - operation = postAggOp[0] - fn = postAggOp[1] - fields.append(field_spec(postAggOp[0])) - fields.append(field_spec(postAggOp[2])) - base = {"type" : "arithmetic", - "fn" : fn, - "fields" : fields} - return base - else: - raise Exception("Not a valid postAggregation operation") \ No newline at end of file diff --git a/build/lib/pydruid/utils/query_utils.py b/build/lib/pydruid/utils/query_utils.py deleted file mode 100644 index 4524e432..00000000 --- a/build/lib/pydruid/utils/query_utils.py +++ /dev/null @@ -1,43 +0,0 @@ -# -# Copyright 2013 Metamarkets Group Inc. -# -# Licensed under the Apache License, Version 2.0 (the "License"); -# you may not use this file except in compliance with the License. -# You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. -# -import csv, codecs, cStringIO - -# A special CSV writer which will write rows to TSV file "f", which is encoded in utf-8. -# this is necessary because the values in druid are not all ASCII. -class UnicodeWriter: - - def __init__(self, f, dialect="excel-tab", encoding="utf-8", **kwds): #delimiter="\t" - # Redirect output to a queue - self.queue = cStringIO.StringIO() - self.writer = csv.writer(self.queue, dialect=dialect, **kwds) - self.stream = f - self.encoder = codecs.getincrementalencoder(encoding)() - - def writerow(self, row): - self.writer.writerow([s.encode("utf-8") if isinstance(s, unicode) else s for s in row]) - # Fetch UTF-8 output from the queue ... - data = self.queue.getvalue() - data = data.decode("utf-8") - # ... and reencode it into the target encoding - data = self.encoder.encode(data) - # write to the target stream - self.stream.write(data) - # empty queue - self.queue.truncate(0) - - def writerows(self, rows): - for row in rows: - self.writerow(row) \ No newline at end of file diff --git a/dist/pyDruid-0.01.tar.gz b/dist/pyDruid-0.01.tar.gz deleted file mode 100644 index 09fcbe27..00000000 Binary files a/dist/pyDruid-0.01.tar.gz and /dev/null differ diff --git a/dist/pyDruid-0.1.0.tar.gz b/dist/pyDruid-0.1.0.tar.gz deleted file mode 100644 index 5f176dfe..00000000 Binary files a/dist/pyDruid-0.1.0.tar.gz and /dev/null differ diff --git a/pyDruid.py b/pyDruid.py deleted file mode 100755 index 51207f27..00000000 --- a/pyDruid.py +++ /dev/null @@ -1,222 +0,0 @@ -# -# Copyright 2013 Metamarkets Group Inc. -# -# Licensed under the Apache License, Version 2.0 (the "License"); -# you may not use this file except in compliance with the License. -# You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. -# -from __future__ import division -import urllib2 -import simplejson as json -import csv -import re -import os -import sys -import pandas -import dateutil.parser -from matplotlib import * -from matplotlib.pyplot import * -from pyDruidUtils.aggregators import * -from pyDruidUtils.postaggregator import * -from pyDruidUtils.filters import * -from pyDruidUtils.query_utils import * - - -class pyDruid: - - def __init__(self,url,endpoint): - self.url = url - self.endpoint = endpoint - self.result = None - self.result_json = None - self.query_type = None - - # serializes a dict representation of the query into a json - # object, and sends it to bard via post, and gets back the - # json representation of the query result - def post(self,query): - querystr = json.dumps(query) - url = self.url + '/' + self.endpoint - headers = {'Content-Type' : 'application/json'} - req = urllib2.Request(url, querystr, headers) - res = urllib2.urlopen(req) - data = res.read() - self.result_json = data; - self.querystr = querystr - res.close() - - # de-serializes the data returned from druid into a - # list of dicts - def parse(self): - if self.result_json: - res = json.loads(self.result_json) - return res - else: - print("Empty query") - return None - - def export_tsv(self,dest_path): - - f = open(dest_path,'wb') - tsv_file = csv.writer(f, delimiter = '\t') - - if(self.query_type == "timeseries"): - header = self.result[0]['result'].keys() - header.append('timestamp') - elif(self.query_type == "groupby"): - header = self.result[0]['event'].keys() - header.append('timestamp') - header.append('version') - - tsv_file.writerow(header) - - # use unicodewriter to encode results in unicode - w = query_utils.UnicodeWriter(f) - - if self.result: - if self.query_type == "timeseries": - for item in self.result: - timestamp = item['timestamp'] - result = item['result'] - - if type(result) is list: - for line in result: - w.writerow(line.values() + [timestamp]) - else: #timeseries - w.writerow(result.values() + [timestamp]) - elif self.query_type == "groupby": - for item in self.result: - timestamp = item['timestamp'] - version = item['version'] - w.writerow(item['event'].values() + [timestamp] + [version]) - - f.close() - - # Exports a JSON query object into a Pandas data-frame - def export_pandas(self): - if self.result: - if self.query_type == "timeseries": - nres = [v['result'].items() + [('timestamp',v['timestamp'])] for v in self.result] - nres = [dict(v) for v in nres] - else: - print('not implemented yet') - return None - - df = pandas.DataFrame(nres) - df['timestamp'] = df['timestamp'].map(lambda x: dateutil.parser.parse(x)) - df['t'] = dates.date2num(df['timestamp']) - return df - - # implements a timeseries query - def timeseries(self, **args): - - query_dict = {"queryType" : "timeseries"} - valid_parts = ['dataSource', 'granularity', 'filter', 'aggregations', 'postAggregations', 'intervals'] - - for key, val in args.iteritems(): - if key not in valid_parts: - raise ValueError('{0} is not a valid query component. The list of valid components is: \n {1}'.format(key, valid_parts)) - elif key == "aggregations" : - query_dict[key] = build_aggregators(val) - elif key == "postAggregations": - query_dict[key] = build_post_aggregators(val) - elif key == "filter": - query_dict[key] = val.filter['filter'] - else: - query_dict[key] = val - - self.query_dict = query_dict - - try: - self.post(query_dict) - except urllib2.HTTPError, e: - raise IOError('Malformed query: \n {0}'.format(json.dumps(self.query_dict ,indent = 4))) - else: - self.result = self.parse() - self.query_type = "timeseries" - return self.result - - # Implements a groupBy query - def groupBy(self, **args): - - query_dict = {"queryType" : "groupBy"} - valid_parts = ['dataSource', 'granularity', 'filter', 'aggregations', 'postAggregations', - 'intervals', 'dimensions'] - - for key, val in args.iteritems(): - if key not in valid_parts: - raise ValueError('{0} is not a valid query component. The list of valid components is: \n {1}'.format(key, valid_parts)) - elif key == "aggregations" : - query_dict[key] = build_aggregators(val) - elif key == "postAggregations": - query_dict[key] = build_post_aggregators(val) - elif key == "filter": - query_dict[key] = val.filter['filter'] - else: - query_dict[key] = val - - self.query_dict = query_dict - - try: - self.post(query_dict) - except urllib2.HTTPError, e: - raise IOError('Malformed query: \n {0}'.format(json.dumps(self.query_dict ,indent = 4))) - else: - self.result = self.parse() - self.query_type = "groupby" - return self.parse() - - # Implements a segmentMetadata query. This query type is for pulling the tsv-equivalent - # size and cardinality broken out by dimension in a specified data source. - def segmentMetadata(self, **args): - - query_dict = {"queryType" : "segmentMetadata"} - valid_parts = ['dataSource', 'intervals'] - - for key, val in args.iteritems(): - if key not in valid_parts: - raise ValueError('{0} is not a valid query component. The list of valid components is: \n {1}'.format(key, valid_parts)) - else: - query_dict[key] = val - - self.query_dict = query_dict - - try: - self.post(query_dict) - except urllib2.HTTPError, e: - raise IOError('Malformed query: \n {0}'.format(json.dumps(self.query_dict ,indent = 4))) - else: - self.result = self.parse() - return self.parse() - - # implements a time boundary query - def timeBoundary(self, **args): - - query_dict = {"queryType" : "timeBoundary"} - valid_parts = ['dataSource'] - - for key, val in args.iteritems(): - if key not in valid_parts: - raise ValueError('{0} is not a valid query component. The list of valid components is: \n {1}'.format(key, valid_parts)) - else: - query_dict[key] = val - - try: - self.post(query_dict) - except urllib2.HTTPError, e: - raise IOError('Malformed query: \n {0}'.format(json.dumps(self.query_dict ,indent = 4))) - else: - self.result = self.parse() - return self.parse() - - # prints internal variables of the object - def describe(self): - print("url: " + self.url) diff --git a/pyDruid/__init__.py b/pyDruid/__init__.py deleted file mode 100644 index e69de29b..00000000 diff --git a/pyDruid/client.py b/pyDruid/client.py deleted file mode 100755 index 3a2bc840..00000000 --- a/pyDruid/client.py +++ /dev/null @@ -1,222 +0,0 @@ -# -# Copyright 2013 Metamarkets Group Inc. -# -# Licensed under the Apache License, Version 2.0 (the "License"); -# you may not use this file except in compliance with the License. -# You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. -# -from __future__ import division -import urllib2 -import simplejson as json -import csv -import re -import os -import sys -import pandas -import dateutil.parser -from matplotlib import * -from matplotlib.pyplot import * -from utils.aggregators import * -from utils.postaggregator import * -from utils.filters import * -from utils.query_utils import * - - -class pyDruid: - - def __init__(self,url,endpoint): - self.url = url - self.endpoint = endpoint - self.result = None - self.result_json = None - self.query_type = None - - # serializes a dict representation of the query into a json - # object, and sends it to bard via post, and gets back the - # json representation of the query result - def post(self,query): - querystr = json.dumps(query) - url = self.url + '/' + self.endpoint - headers = {'Content-Type' : 'application/json'} - req = urllib2.Request(url, querystr, headers) - res = urllib2.urlopen(req) - data = res.read() - self.result_json = data; - self.querystr = querystr - res.close() - - # de-serializes the data returned from druid into a - # list of dicts - def parse(self): - if self.result_json: - res = json.loads(self.result_json) - return res - else: - print("Empty query") - return None - - def export_tsv(self,dest_path): - - f = open(dest_path,'wb') - tsv_file = csv.writer(f, delimiter = '\t') - - if(self.query_type == "timeseries"): - header = self.result[0]['result'].keys() - header.append('timestamp') - elif(self.query_type == "groupby"): - header = self.result[0]['event'].keys() - header.append('timestamp') - header.append('version') - - tsv_file.writerow(header) - - # use unicodewriter to encode results in unicode - w = query_utils.UnicodeWriter(f) - - if self.result: - if self.query_type == "timeseries": - for item in self.result: - timestamp = item['timestamp'] - result = item['result'] - - if type(result) is list: - for line in result: - w.writerow(line.values() + [timestamp]) - else: #timeseries - w.writerow(result.values() + [timestamp]) - elif self.query_type == "groupby": - for item in self.result: - timestamp = item['timestamp'] - version = item['version'] - w.writerow(item['event'].values() + [timestamp] + [version]) - - f.close() - - # Exports a JSON query object into a Pandas data-frame - def export_pandas(self): - if self.result: - if self.query_type == "timeseries": - nres = [v['result'].items() + [('timestamp',v['timestamp'])] for v in self.result] - nres = [dict(v) for v in nres] - else: - print('not implemented yet') - return None - - df = pandas.DataFrame(nres) - df['timestamp'] = df['timestamp'].map(lambda x: dateutil.parser.parse(x)) - df['t'] = dates.date2num(df['timestamp']) - return df - - # implements a timeseries query - def timeseries(self, **args): - - query_dict = {"queryType" : "timeseries"} - valid_parts = ['dataSource', 'granularity', 'filter', 'aggregations', 'postAggregations', 'intervals'] - - for key, val in args.iteritems(): - if key not in valid_parts: - raise ValueError('{0} is not a valid query component. The list of valid components is: \n {1}'.format(key, valid_parts)) - elif key == "aggregations" : - query_dict[key] = build_aggregators(val) - elif key == "postAggregations": - query_dict[key] = build_post_aggregators(val) - elif key == "filter": - query_dict[key] = val.filter['filter'] - else: - query_dict[key] = val - - self.query_dict = query_dict - - try: - self.post(query_dict) - except urllib2.HTTPError, e: - raise IOError('Malformed query: \n {0}'.format(json.dumps(self.query_dict ,indent = 4))) - else: - self.result = self.parse() - self.query_type = "timeseries" - return self.result - - # Implements a groupBy query - def groupBy(self, **args): - - query_dict = {"queryType" : "groupBy"} - valid_parts = ['dataSource', 'granularity', 'filter', 'aggregations', 'postAggregations', - 'intervals', 'dimensions'] - - for key, val in args.iteritems(): - if key not in valid_parts: - raise ValueError('{0} is not a valid query component. The list of valid components is: \n {1}'.format(key, valid_parts)) - elif key == "aggregations" : - query_dict[key] = build_aggregators(val) - elif key == "postAggregations": - query_dict[key] = build_post_aggregators(val) - elif key == "filter": - query_dict[key] = val.filter['filter'] - else: - query_dict[key] = val - - self.query_dict = query_dict - - try: - self.post(query_dict) - except urllib2.HTTPError, e: - raise IOError('Malformed query: \n {0}'.format(json.dumps(self.query_dict ,indent = 4))) - else: - self.result = self.parse() - self.query_type = "groupby" - return self.parse() - - # Implements a segmentMetadata query. This query type is for pulling the tsv-equivalent - # size and cardinality broken out by dimension in a specified data source. - def segmentMetadata(self, **args): - - query_dict = {"queryType" : "segmentMetadata"} - valid_parts = ['dataSource', 'intervals'] - - for key, val in args.iteritems(): - if key not in valid_parts: - raise ValueError('{0} is not a valid query component. The list of valid components is: \n {1}'.format(key, valid_parts)) - else: - query_dict[key] = val - - self.query_dict = query_dict - - try: - self.post(query_dict) - except urllib2.HTTPError, e: - raise IOError('Malformed query: \n {0}'.format(json.dumps(self.query_dict ,indent = 4))) - else: - self.result = self.parse() - return self.parse() - - # implements a time boundary query - def timeBoundary(self, **args): - - query_dict = {"queryType" : "timeBoundary"} - valid_parts = ['dataSource'] - - for key, val in args.iteritems(): - if key not in valid_parts: - raise ValueError('{0} is not a valid query component. The list of valid components is: \n {1}'.format(key, valid_parts)) - else: - query_dict[key] = val - - try: - self.post(query_dict) - except urllib2.HTTPError, e: - raise IOError('Malformed query: \n {0}'.format(json.dumps(self.query_dict ,indent = 4))) - else: - self.result = self.parse() - return self.parse() - - # prints internal variables of the object - def describe(self): - print("url: " + self.url) diff --git a/pyDruid/pyDruidUtils/__init__.py b/pyDruid/pyDruidUtils/__init__.py deleted file mode 100644 index e69de29b..00000000 diff --git a/pyDruid/pyDruidUtils/aggregators.py b/pyDruid/pyDruidUtils/aggregators.py deleted file mode 100644 index efd54f42..00000000 --- a/pyDruid/pyDruidUtils/aggregators.py +++ /dev/null @@ -1,37 +0,0 @@ -# -# Copyright 2013 Metamarkets Group Inc. -# -# Licensed under the Apache License, Version 2.0 (the "License"); -# you may not use this file except in compliance with the License. -# You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. -# - -#create a json list of user-defined aggregations - -# Define the support aggregation types - -def longSum(raw_metric): - return {"type" : "longSum", "fieldName" : raw_metric} - -def doubleSum(raw_metric): - return {"type" : "doubleSum", "fieldName" : raw_metric} - -def min(raw_metric): - return {"type" : "min", "fieldName" : raw_metric} - -def max(raw_metric): - return {"type" : "max", "fieldName" : raw_metric} - -def count(raw_metric): - return {"type" : "count", "fieldName" : raw_metric} - -def build_aggregators(agg_input): - return [dict([('name',k)] + v.items()) for (k,v) in agg_input.iteritems()] \ No newline at end of file diff --git a/pyDruid/pyDruidUtils/filters.py b/pyDruid/pyDruidUtils/filters.py deleted file mode 100644 index 562cf78d..00000000 --- a/pyDruid/pyDruidUtils/filters.py +++ /dev/null @@ -1,58 +0,0 @@ -# -# Copyright 2013 Metamarkets Group Inc. -# -# Licensed under the Apache License, Version 2.0 (the "License"); -# you may not use this file except in compliance with the License. -# You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. -# -class Filter: - - def __init__(self, **args): - - # constrct a selector - if 'type' not in args.keys(): - self.filter = {"filter": {"type": "selector", - "dimension" : args["dimension"], - "value" : args["value"]}} - # construct an and filter - elif args["type"] == "and": - self.filter = {"filter": {"type" : "and", - "fields" : args["fields"]}} - # construct an or filter - elif args["type"] == "or": - self.filter = {"filter": {"type" : "or", - "fields" : args["fields"]}} - # construct a not filter - elif args["type"] == "not": - self.filter = {"filter" : {"type" : "not", - "field" : args["field"]}} - else: - print("you've slain me. nevermind the teeth and the fingernails. the show must go on.") - - def show(self): - print(json.dumps(self.filter, indent = 4)) - - def __and__(self, x): - return Filter(type = "and", fields = [self.filter['filter'], x.filter['filter']]) - - def __or__(self,x): - return Filter(type = "or", fields = [self.filter['filter'], x.filter['filter']]) - - def __invert__(self): - return Filter(type = "not", field = self.filter['filter']) - -class Dimension: - - def __init__(self, dim): - self.dimension = dim - - def __eq__(self,other): - return Filter(dimension = self.dimension, value = other) \ No newline at end of file diff --git a/pyDruid/pyDruidUtils/postaggregator.py b/pyDruid/pyDruidUtils/postaggregator.py deleted file mode 100644 index 0d647850..00000000 --- a/pyDruid/pyDruidUtils/postaggregator.py +++ /dev/null @@ -1,56 +0,0 @@ -# -# Copyright 2013 Metamarkets Group Inc. -# -# Licensed under the Apache License, Version 2.0 (the "License"); -# you may not use this file except in compliance with the License. -# You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. -# -import pprint as pp -import re - -# determine whether arguments are strings or numeric constants -def is_numeric(field): - for obj in [int, float, long]: - try: - obj(field) - return True - except: - pass - - return False - -# specify field type as source field name (string) or constant and build json -def field_spec(field): - if is_numeric(field): - return {"type" : "constant", - "name" : "constant_%d" % int(field), - "value" : field} - else: - return {"type" : "fieldAccess", - "fieldName" : field} - -# use regex to extract fields and operand from user-submitted postAgg string -def parse_calc(postAggOpString): - fields = [] - rgx = "(.+)[ ]?([*+/-])[ ]?(.+)" - postAggOp = re.findall(rgx, postAggOpString).pop() - - if postAggOp: - operation = postAggOp[0] - fn = postAggOp[1] - fields.append(field_spec(postAggOp[0])) - fields.append(field_spec(postAggOp[2])) - base = {"type" : "arithmetic", - "fn" : fn, - "fields" : fields} - return base - else: - raise Exception("Not a valid postAggregation operation") \ No newline at end of file diff --git a/pyDruid/pyDruidUtils/query_utils.py b/pyDruid/pyDruidUtils/query_utils.py deleted file mode 100644 index 4524e432..00000000 --- a/pyDruid/pyDruidUtils/query_utils.py +++ /dev/null @@ -1,43 +0,0 @@ -# -# Copyright 2013 Metamarkets Group Inc. -# -# Licensed under the Apache License, Version 2.0 (the "License"); -# you may not use this file except in compliance with the License. -# You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. -# -import csv, codecs, cStringIO - -# A special CSV writer which will write rows to TSV file "f", which is encoded in utf-8. -# this is necessary because the values in druid are not all ASCII. -class UnicodeWriter: - - def __init__(self, f, dialect="excel-tab", encoding="utf-8", **kwds): #delimiter="\t" - # Redirect output to a queue - self.queue = cStringIO.StringIO() - self.writer = csv.writer(self.queue, dialect=dialect, **kwds) - self.stream = f - self.encoder = codecs.getincrementalencoder(encoding)() - - def writerow(self, row): - self.writer.writerow([s.encode("utf-8") if isinstance(s, unicode) else s for s in row]) - # Fetch UTF-8 output from the queue ... - data = self.queue.getvalue() - data = data.decode("utf-8") - # ... and reencode it into the target encoding - data = self.encoder.encode(data) - # write to the target stream - self.stream.write(data) - # empty queue - self.queue.truncate(0) - - def writerows(self, rows): - for row in rows: - self.writerow(row) \ No newline at end of file diff --git a/pyDruid/test/__init__.py b/pyDruid/test/__init__.py deleted file mode 100644 index e69de29b..00000000 diff --git a/pyDruid/utils/__init__.py b/pyDruid/utils/__init__.py deleted file mode 100644 index e69de29b..00000000 diff --git a/pyDruid/utils/aggregators.py b/pyDruid/utils/aggregators.py deleted file mode 100644 index efd54f42..00000000 --- a/pyDruid/utils/aggregators.py +++ /dev/null @@ -1,37 +0,0 @@ -# -# Copyright 2013 Metamarkets Group Inc. -# -# Licensed under the Apache License, Version 2.0 (the "License"); -# you may not use this file except in compliance with the License. -# You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. -# - -#create a json list of user-defined aggregations - -# Define the support aggregation types - -def longSum(raw_metric): - return {"type" : "longSum", "fieldName" : raw_metric} - -def doubleSum(raw_metric): - return {"type" : "doubleSum", "fieldName" : raw_metric} - -def min(raw_metric): - return {"type" : "min", "fieldName" : raw_metric} - -def max(raw_metric): - return {"type" : "max", "fieldName" : raw_metric} - -def count(raw_metric): - return {"type" : "count", "fieldName" : raw_metric} - -def build_aggregators(agg_input): - return [dict([('name',k)] + v.items()) for (k,v) in agg_input.iteritems()] \ No newline at end of file diff --git a/pyDruid/utils/filters.py b/pyDruid/utils/filters.py deleted file mode 100644 index 562cf78d..00000000 --- a/pyDruid/utils/filters.py +++ /dev/null @@ -1,58 +0,0 @@ -# -# Copyright 2013 Metamarkets Group Inc. -# -# Licensed under the Apache License, Version 2.0 (the "License"); -# you may not use this file except in compliance with the License. -# You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. -# -class Filter: - - def __init__(self, **args): - - # constrct a selector - if 'type' not in args.keys(): - self.filter = {"filter": {"type": "selector", - "dimension" : args["dimension"], - "value" : args["value"]}} - # construct an and filter - elif args["type"] == "and": - self.filter = {"filter": {"type" : "and", - "fields" : args["fields"]}} - # construct an or filter - elif args["type"] == "or": - self.filter = {"filter": {"type" : "or", - "fields" : args["fields"]}} - # construct a not filter - elif args["type"] == "not": - self.filter = {"filter" : {"type" : "not", - "field" : args["field"]}} - else: - print("you've slain me. nevermind the teeth and the fingernails. the show must go on.") - - def show(self): - print(json.dumps(self.filter, indent = 4)) - - def __and__(self, x): - return Filter(type = "and", fields = [self.filter['filter'], x.filter['filter']]) - - def __or__(self,x): - return Filter(type = "or", fields = [self.filter['filter'], x.filter['filter']]) - - def __invert__(self): - return Filter(type = "not", field = self.filter['filter']) - -class Dimension: - - def __init__(self, dim): - self.dimension = dim - - def __eq__(self,other): - return Filter(dimension = self.dimension, value = other) \ No newline at end of file diff --git a/pyDruid/utils/postaggregator.py b/pyDruid/utils/postaggregator.py deleted file mode 100644 index 0d647850..00000000 --- a/pyDruid/utils/postaggregator.py +++ /dev/null @@ -1,56 +0,0 @@ -# -# Copyright 2013 Metamarkets Group Inc. -# -# Licensed under the Apache License, Version 2.0 (the "License"); -# you may not use this file except in compliance with the License. -# You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. -# -import pprint as pp -import re - -# determine whether arguments are strings or numeric constants -def is_numeric(field): - for obj in [int, float, long]: - try: - obj(field) - return True - except: - pass - - return False - -# specify field type as source field name (string) or constant and build json -def field_spec(field): - if is_numeric(field): - return {"type" : "constant", - "name" : "constant_%d" % int(field), - "value" : field} - else: - return {"type" : "fieldAccess", - "fieldName" : field} - -# use regex to extract fields and operand from user-submitted postAgg string -def parse_calc(postAggOpString): - fields = [] - rgx = "(.+)[ ]?([*+/-])[ ]?(.+)" - postAggOp = re.findall(rgx, postAggOpString).pop() - - if postAggOp: - operation = postAggOp[0] - fn = postAggOp[1] - fields.append(field_spec(postAggOp[0])) - fields.append(field_spec(postAggOp[2])) - base = {"type" : "arithmetic", - "fn" : fn, - "fields" : fields} - return base - else: - raise Exception("Not a valid postAggregation operation") \ No newline at end of file diff --git a/pyDruid/utils/query_utils.py b/pyDruid/utils/query_utils.py deleted file mode 100644 index 4524e432..00000000 --- a/pyDruid/utils/query_utils.py +++ /dev/null @@ -1,43 +0,0 @@ -# -# Copyright 2013 Metamarkets Group Inc. -# -# Licensed under the Apache License, Version 2.0 (the "License"); -# you may not use this file except in compliance with the License. -# You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. -# -import csv, codecs, cStringIO - -# A special CSV writer which will write rows to TSV file "f", which is encoded in utf-8. -# this is necessary because the values in druid are not all ASCII. -class UnicodeWriter: - - def __init__(self, f, dialect="excel-tab", encoding="utf-8", **kwds): #delimiter="\t" - # Redirect output to a queue - self.queue = cStringIO.StringIO() - self.writer = csv.writer(self.queue, dialect=dialect, **kwds) - self.stream = f - self.encoder = codecs.getincrementalencoder(encoding)() - - def writerow(self, row): - self.writer.writerow([s.encode("utf-8") if isinstance(s, unicode) else s for s in row]) - # Fetch UTF-8 output from the queue ... - data = self.queue.getvalue() - data = data.decode("utf-8") - # ... and reencode it into the target encoding - data = self.encoder.encode(data) - # write to the target stream - self.stream.write(data) - # empty queue - self.queue.truncate(0) - - def writerows(self, rows): - for row in rows: - self.writerow(row) \ No newline at end of file diff --git a/pyDruidUtils/__init__.py b/pyDruidUtils/__init__.py deleted file mode 100644 index e69de29b..00000000 diff --git a/pyDruidUtils/aggregators.py b/pyDruidUtils/aggregators.py deleted file mode 100644 index efd54f42..00000000 --- a/pyDruidUtils/aggregators.py +++ /dev/null @@ -1,37 +0,0 @@ -# -# Copyright 2013 Metamarkets Group Inc. -# -# Licensed under the Apache License, Version 2.0 (the "License"); -# you may not use this file except in compliance with the License. -# You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. -# - -#create a json list of user-defined aggregations - -# Define the support aggregation types - -def longSum(raw_metric): - return {"type" : "longSum", "fieldName" : raw_metric} - -def doubleSum(raw_metric): - return {"type" : "doubleSum", "fieldName" : raw_metric} - -def min(raw_metric): - return {"type" : "min", "fieldName" : raw_metric} - -def max(raw_metric): - return {"type" : "max", "fieldName" : raw_metric} - -def count(raw_metric): - return {"type" : "count", "fieldName" : raw_metric} - -def build_aggregators(agg_input): - return [dict([('name',k)] + v.items()) for (k,v) in agg_input.iteritems()] \ No newline at end of file diff --git a/pyDruidUtils/filters.py b/pyDruidUtils/filters.py deleted file mode 100644 index 562cf78d..00000000 --- a/pyDruidUtils/filters.py +++ /dev/null @@ -1,58 +0,0 @@ -# -# Copyright 2013 Metamarkets Group Inc. -# -# Licensed under the Apache License, Version 2.0 (the "License"); -# you may not use this file except in compliance with the License. -# You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. -# -class Filter: - - def __init__(self, **args): - - # constrct a selector - if 'type' not in args.keys(): - self.filter = {"filter": {"type": "selector", - "dimension" : args["dimension"], - "value" : args["value"]}} - # construct an and filter - elif args["type"] == "and": - self.filter = {"filter": {"type" : "and", - "fields" : args["fields"]}} - # construct an or filter - elif args["type"] == "or": - self.filter = {"filter": {"type" : "or", - "fields" : args["fields"]}} - # construct a not filter - elif args["type"] == "not": - self.filter = {"filter" : {"type" : "not", - "field" : args["field"]}} - else: - print("you've slain me. nevermind the teeth and the fingernails. the show must go on.") - - def show(self): - print(json.dumps(self.filter, indent = 4)) - - def __and__(self, x): - return Filter(type = "and", fields = [self.filter['filter'], x.filter['filter']]) - - def __or__(self,x): - return Filter(type = "or", fields = [self.filter['filter'], x.filter['filter']]) - - def __invert__(self): - return Filter(type = "not", field = self.filter['filter']) - -class Dimension: - - def __init__(self, dim): - self.dimension = dim - - def __eq__(self,other): - return Filter(dimension = self.dimension, value = other) \ No newline at end of file diff --git a/pyDruidUtils/postaggregator.py b/pyDruidUtils/postaggregator.py deleted file mode 100644 index 0d647850..00000000 --- a/pyDruidUtils/postaggregator.py +++ /dev/null @@ -1,56 +0,0 @@ -# -# Copyright 2013 Metamarkets Group Inc. -# -# Licensed under the Apache License, Version 2.0 (the "License"); -# you may not use this file except in compliance with the License. -# You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. -# -import pprint as pp -import re - -# determine whether arguments are strings or numeric constants -def is_numeric(field): - for obj in [int, float, long]: - try: - obj(field) - return True - except: - pass - - return False - -# specify field type as source field name (string) or constant and build json -def field_spec(field): - if is_numeric(field): - return {"type" : "constant", - "name" : "constant_%d" % int(field), - "value" : field} - else: - return {"type" : "fieldAccess", - "fieldName" : field} - -# use regex to extract fields and operand from user-submitted postAgg string -def parse_calc(postAggOpString): - fields = [] - rgx = "(.+)[ ]?([*+/-])[ ]?(.+)" - postAggOp = re.findall(rgx, postAggOpString).pop() - - if postAggOp: - operation = postAggOp[0] - fn = postAggOp[1] - fields.append(field_spec(postAggOp[0])) - fields.append(field_spec(postAggOp[2])) - base = {"type" : "arithmetic", - "fn" : fn, - "fields" : fields} - return base - else: - raise Exception("Not a valid postAggregation operation") \ No newline at end of file diff --git a/pyDruidUtils/query_utils.py b/pyDruidUtils/query_utils.py deleted file mode 100644 index 4524e432..00000000 --- a/pyDruidUtils/query_utils.py +++ /dev/null @@ -1,43 +0,0 @@ -# -# Copyright 2013 Metamarkets Group Inc. -# -# Licensed under the Apache License, Version 2.0 (the "License"); -# you may not use this file except in compliance with the License. -# You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. -# -import csv, codecs, cStringIO - -# A special CSV writer which will write rows to TSV file "f", which is encoded in utf-8. -# this is necessary because the values in druid are not all ASCII. -class UnicodeWriter: - - def __init__(self, f, dialect="excel-tab", encoding="utf-8", **kwds): #delimiter="\t" - # Redirect output to a queue - self.queue = cStringIO.StringIO() - self.writer = csv.writer(self.queue, dialect=dialect, **kwds) - self.stream = f - self.encoder = codecs.getincrementalencoder(encoding)() - - def writerow(self, row): - self.writer.writerow([s.encode("utf-8") if isinstance(s, unicode) else s for s in row]) - # Fetch UTF-8 output from the queue ... - data = self.queue.getvalue() - data = data.decode("utf-8") - # ... and reencode it into the target encoding - data = self.encoder.encode(data) - # write to the target stream - self.stream.write(data) - # empty queue - self.queue.truncate(0) - - def writerows(self, rows): - for row in rows: - self.writerow(row) \ No newline at end of file diff --git a/test/__init__.py b/test/__init__.py deleted file mode 100644 index e69de29b..00000000