From 7a342925709e4a362abfb700e0e4eb816e052c52 Mon Sep 17 00:00:00 2001 From: Ales Pour Date: Thu, 13 Jun 2024 16:25:12 +0200 Subject: [PATCH 1/5] refactor: extract query API --- influxdb_client_3/__init__.py | 54 ++-------- influxdb_client_3/query/query_api.py | 105 ++++++++++++++++++++ tests/test_influxdb_client_3.py | 8 +- tests/test_influxdb_client_3_integration.py | 1 + tests/test_query.py | 17 +--- 5 files changed, 121 insertions(+), 64 deletions(-) create mode 100644 influxdb_client_3/query/query_api.py diff --git a/influxdb_client_3/__init__.py b/influxdb_client_3/__init__.py index b01d0e1..18b785a 100644 --- a/influxdb_client_3/__init__.py +++ b/influxdb_client_3/__init__.py @@ -2,8 +2,8 @@ import urllib.parse import pyarrow as pa -from pyarrow.flight import FlightClient, Ticket, FlightCallOptions +from influxdb_client_3.query.query_api import QueryApi as _QueryApi from influxdb_client_3.read_file import UploadFile from influxdb_client_3.write_client import InfluxDBClient as _InfluxDBClient, WriteOptions, Point from influxdb_client_3.write_client.client.exceptions import InfluxDBError @@ -144,23 +144,15 @@ def __init__( **kwargs) self._write_api = _WriteApi(influxdb_client=self._client, **self._write_client_options) - self._flight_client_options = flight_client_options or {} if query_port_overwrite is not None: port = query_port_overwrite - - gen_opts = [ - ("grpc.secondary_user_agent", USER_AGENT) - ] - - self._flight_client_options["generic_options"] = gen_opts - if scheme == 'https': connection_string = f"grpc+tls://{hostname}:{port}" else: connection_string = f"grpc+tcp://{hostname}:{port}" - - self._flight_client = FlightClient(connection_string, **self._flight_client_options) + self._query_api = _QueryApi(connection_string=connection_string, token=token, + flight_client_options=flight_client_options) def write(self, record=None, database=None, **kwargs): """ @@ -258,48 +250,14 @@ def query(self, query: str, language: str = "sql", mode: str = "all", database: database = self._database try: - # Create an authorization header - optargs = { - "headers": [(b"authorization", f"Bearer {self._token}".encode('utf-8'))], - "timeout": 300 - } - opts = _merge_options(optargs, exclude_keys=['query_parameters'], custom=kwargs) - _options = FlightCallOptions(**opts) - - # - # Ticket data - # - ticket_data = { - "database": database, - "sql_query": query, - "query_type": language - } - # add query parameters - query_parameters = kwargs.get("query_parameters", None) - if query_parameters: - ticket_data["params"] = query_parameters - - ticket = Ticket(json.dumps(ticket_data).encode('utf-8')) - flight_reader = self._flight_client.do_get(ticket, _options) - - mode_func = { - "all": flight_reader.read_all, - "pandas": flight_reader.read_pandas, - "polars": lambda: pl.from_arrow(flight_reader.read_all()), - "chunk": lambda: flight_reader, - "reader": flight_reader.to_reader, - "schema": lambda: flight_reader.schema - - }.get(mode, flight_reader.read_all) - - return mode_func() if callable(mode_func) else mode_func - except Exception as e: + return self._query_api.query(query=query, language=language, mode=mode, database=database, **kwargs) + except InfluxDBError as e: raise e def close(self): """Close the client and clean up resources.""" self._write_api.close() - self._flight_client.close() + self._query_api.close() self._client.close() def __enter__(self): diff --git a/influxdb_client_3/query/query_api.py b/influxdb_client_3/query/query_api.py new file mode 100644 index 0000000..f521050 --- /dev/null +++ b/influxdb_client_3/query/query_api.py @@ -0,0 +1,105 @@ +"""Query data in InfluxDB 3.""" + +# coding: utf-8 +import json + +from pyarrow.flight import FlightClient, Ticket, FlightCallOptions, FlightStreamReader +from influxdb_client_3.version import USER_AGENT + + +class QueryApi(object): + """ + Implementation for '/api/v2/query' endpoint. + + Example: + .. code-block:: python + + from influxdb_client import InfluxDBClient + + + # Initialize instance of QueryApi + with InfluxDBClient(url="http://localhost:8086", token="my-token", org="my-org") as client: + query_api = client.query_api() + """ + + def __init__(self, + connection_string, + token, + flight_client_options, + **kwargs) -> None: + """ + Initialize defaults. + + :param connection_string: Flight/gRPC connection string + :param token: access token + :param flight_client_options: Flight client options + """ + self._token = token + self._flight_client_options = flight_client_options or {} + self._flight_client_options["generic_options"] = [ + ("grpc.secondary_user_agent", USER_AGENT) + ] + self._flight_client = FlightClient(connection_string, **self._flight_client_options) + + def query(self, query: str, language: str, mode: str, database: str, **kwargs): + """Query data from InfluxDB. + + :param query: The query to execute on the database. + :param language: The query language. + :param mode: The mode to use for the query. + It should be one of "all", "pandas", "polars", "chunk", "reader" or "schema". + :param database: The database to query from. + :param kwargs: Additional arguments to pass to the ``FlightCallOptions headers``. + For example, it can be used to set up per request headers. + :keyword query_parameters: The query parameters to use in the query. + It should be a ``dictionary`` of key-value pairs. + :return: The query result in the specified mode. + """ + from influxdb_client_3 import polars as has_polars, _merge_options as merge_options + try: + # Create an authorization header + optargs = { + "headers": [(b"authorization", f"Bearer {self._token}".encode('utf-8'))], + "timeout": 300 + } + opts = merge_options(optargs, exclude_keys=['query_parameters'], custom=kwargs) + _options = FlightCallOptions(**opts) + + # + # Ticket data + # + ticket_data = { + "database": database, + "sql_query": query, + "query_type": language + } + # add query parameters + query_parameters = kwargs.get("query_parameters", None) + if query_parameters: + ticket_data["params"] = query_parameters + + ticket = Ticket(json.dumps(ticket_data).encode('utf-8')) + flight_reader = self._do_get(ticket, _options) + + mode_funcs = { + "all": flight_reader.read_all, + "pandas": flight_reader.read_pandas, + "chunk": lambda: flight_reader, + "reader": flight_reader.to_reader, + "schema": lambda: flight_reader.schema + } + if has_polars: + import polars as pl + mode_funcs["polars"] = lambda: pl.from_arrow(flight_reader.read_all()) + mode_func = mode_funcs.get(mode, flight_reader.read_all) + + return mode_func() if callable(mode_func) else mode_func + except Exception as e: + raise e + + def _do_get(self, ticket: Ticket, options: FlightCallOptions = None) -> FlightStreamReader: + return self._flight_client.do_get(ticket, options) + + def close(self): + """Close the Flight client.""" + self._flight_client.close() diff --git a/tests/test_influxdb_client_3.py b/tests/test_influxdb_client_3.py index 5adfc1e..42c58a0 100644 --- a/tests/test_influxdb_client_3.py +++ b/tests/test_influxdb_client_3.py @@ -8,11 +8,11 @@ class TestInfluxDBClient3(unittest.TestCase): @patch('influxdb_client_3._InfluxDBClient') @patch('influxdb_client_3._WriteApi') - @patch('influxdb_client_3.FlightClient') - def setUp(self, mock_flight_client, mock_write_api, mock_influx_db_client): + @patch('influxdb_client_3._QueryApi') + def setUp(self, mock_query_api, mock_write_api, mock_influx_db_client): self.mock_influx_db_client = mock_influx_db_client self.mock_write_api = mock_write_api - self.mock_flight_client = mock_flight_client + self.mock_query_api = mock_query_api self.client = InfluxDBClient3( host="localhost", org="my_org", @@ -25,7 +25,7 @@ def test_init(self): self.assertEqual(self.client._database, "my_db") self.assertEqual(self.client._client, self.mock_influx_db_client.return_value) self.assertEqual(self.client._write_api, self.mock_write_api.return_value) - self.assertEqual(self.client._flight_client, self.mock_flight_client.return_value) + self.assertEqual(self.client._query_api, self.mock_query_api.return_value) if __name__ == '__main__': diff --git a/tests/test_influxdb_client_3_integration.py b/tests/test_influxdb_client_3_integration.py index 0275764..235d83d 100644 --- a/tests/test_influxdb_client_3_integration.py +++ b/tests/test_influxdb_client_3_integration.py @@ -39,6 +39,7 @@ def test_write_and_query(self): df = self.client.query(sql, mode="pandas", query_parameters={'type': 'used', 'test_id': test_id}) + self.assertIsNotNone(df) self.assertEqual(1, len(df)) self.assertEqual(test_id, df['test_id'][0]) self.assertEqual(123.0, df['value'][0]) diff --git a/tests/test_query.py b/tests/test_query.py index ad91063..77490a9 100644 --- a/tests/test_query.py +++ b/tests/test_query.py @@ -66,7 +66,8 @@ class HeaderCheckServerMiddleware(ServerMiddleware): Middleware needed to catch request headers via factory N.B. As found in pyarrow tests """ - def __init__(self, token): + def __init__(self, token, *args, **kwargs): + super().__init__(*args, **kwargs) self.token = token def sending_headers(self): @@ -114,25 +115,17 @@ def test_influx_default_query_headers(): class TestQuery(unittest.TestCase): - @patch('influxdb_client_3._InfluxDBClient') - @patch('influxdb_client_3._WriteApi') - @patch('influxdb_client_3.FlightClient') - def setUp(self, mock_flight_client, mock_write_api, mock_influx_db_client): - self.mock_influx_db_client = mock_influx_db_client - self.mock_write_api = mock_write_api - self.mock_flight_client = mock_flight_client + def setUp(self): self.client = InfluxDBClient3( host="localhost", org="my_org", database="my_db", token="my_token" ) - self.client._flight_client = mock_flight_client - self.client._write_api = mock_write_api def test_query_without_parameters(self): mock_do_get = Mock() - self.client._flight_client.do_get = mock_do_get + self.client._query_api._do_get = mock_do_get self.client.query('SELECT * FROM measurement') @@ -146,7 +139,7 @@ def test_query_without_parameters(self): def test_query_with_parameters(self): mock_do_get = Mock() - self.client._flight_client.do_get = mock_do_get + self.client._query_api._do_get = mock_do_get self.client.query('SELECT * FROM measurement WHERE time > $time', query_parameters={"time": "2021-01-01"}) From e8eb7847fdabc5a7c709868deff5b522f5f06e61 Mon Sep 17 00:00:00 2001 From: Ales Pour Date: Thu, 13 Jun 2024 16:31:47 +0200 Subject: [PATCH 2/5] test: add pytest.ini with custom marker definition --- pytest.ini | 3 +++ 1 file changed, 3 insertions(+) create mode 100644 pytest.ini diff --git a/pytest.ini b/pytest.ini new file mode 100644 index 0000000..8f80fda --- /dev/null +++ b/pytest.ini @@ -0,0 +1,3 @@ +[pytest] +markers = + integration: marks integration tests (deselect with '-m "not integration"') From 32ce8ea9668ea690901b873e6c3871bd9aa3c452 Mon Sep 17 00:00:00 2001 From: Ales Pour Date: Fri, 14 Jun 2024 09:50:19 +0200 Subject: [PATCH 3/5] fix: polars pkg check --- influxdb_client_3/__init__.py | 10 ++-------- 1 file changed, 2 insertions(+), 8 deletions(-) diff --git a/influxdb_client_3/__init__.py b/influxdb_client_3/__init__.py index 18b785a..bea7a17 100644 --- a/influxdb_client_3/__init__.py +++ b/influxdb_client_3/__init__.py @@ -1,7 +1,7 @@ -import json import urllib.parse import pyarrow as pa +import importlib.util from influxdb_client_3.query.query_api import QueryApi as _QueryApi from influxdb_client_3.read_file import UploadFile @@ -10,14 +10,8 @@ from influxdb_client_3.write_client.client.write_api import WriteApi as _WriteApi, SYNCHRONOUS, ASYNCHRONOUS, \ PointSettings from influxdb_client_3.write_client.domain.write_precision import WritePrecision -from influxdb_client_3.version import USER_AGENT -try: - import polars as pl - - polars = True -except ImportError: - polars = False +polars = importlib.util.find_spec("polars") is not None def write_client_options(**kwargs): From b1912e93073107945f2473704cbd0dc3e4fe5761 Mon Sep 17 00:00:00 2001 From: Ales Pour Date: Fri, 14 Jun 2024 09:52:24 +0200 Subject: [PATCH 4/5] test: remove unused import --- tests/test_query.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/tests/test_query.py b/tests/test_query.py index 77490a9..0ad7e3f 100644 --- a/tests/test_query.py +++ b/tests/test_query.py @@ -1,6 +1,6 @@ import unittest import struct -from unittest.mock import Mock, patch, ANY +from unittest.mock import Mock, ANY from pyarrow import ( array, From 56089aa292941a1a5777f7e9bc5e338533ac04c4 Mon Sep 17 00:00:00 2001 From: Ales Pour Date: Wed, 19 Jun 2024 09:27:08 +0200 Subject: [PATCH 5/5] fix: remove unused optional kwargs --- influxdb_client_3/query/query_api.py | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/influxdb_client_3/query/query_api.py b/influxdb_client_3/query/query_api.py index f521050..3b36d20 100644 --- a/influxdb_client_3/query/query_api.py +++ b/influxdb_client_3/query/query_api.py @@ -25,8 +25,7 @@ class QueryApi(object): def __init__(self, connection_string, token, - flight_client_options, - **kwargs) -> None: + flight_client_options) -> None: """ Initialize defaults.