Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix: Polars optional import #95

Merged
merged 11 commits into from
Jun 26, 2024
1 change: 0 additions & 1 deletion influxdb_client_3/__init__.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,4 @@
import urllib.parse

import pyarrow as pa
import importlib.util

Expand Down
4 changes: 2 additions & 2 deletions influxdb_client_3/write_client/client/_base.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,8 +7,7 @@
import os
from typing import Iterable

from influxdb_client_3.write_client.client.write.dataframe_serializer import DataframeSerializer, \
PolarsDataframeSerializer
from influxdb_client_3.write_client.client.write.dataframe_serializer import DataframeSerializer
from influxdb_client_3.write_client.configuration import Configuration
from influxdb_client_3.write_client.rest import _UTF_8_encoding
from influxdb_client_3.write_client.service.write_service import WriteService
Expand Down Expand Up @@ -249,6 +248,7 @@
self._serialize(Point.from_dict(record, write_precision=write_precision, **kwargs),
write_precision, payload, **kwargs)
elif 'polars' in str(type(record)):
from influxdb_client_3.write_client.client.write.dataframe_serializer import PolarsDataframeSerializer

Check warning on line 251 in influxdb_client_3/write_client/client/_base.py

View check run for this annotation

Codecov / codecov/patch

influxdb_client_3/write_client/client/_base.py#L251

Added line #L251 was not covered by tests
serializer = PolarsDataframeSerializer(record, self._point_settings, write_precision, **kwargs)
self._serialize(serializer.serialize(), write_precision, payload, **kwargs)

Expand Down
148 changes: 0 additions & 148 deletions influxdb_client_3/write_client/client/write/dataframe_serializer.py
Original file line number Diff line number Diff line change
Expand Up @@ -284,137 +284,6 @@ def number_of_chunks(self):
return self.number_of_chunks


class PolarsDataframeSerializer:
"""Serialize DataFrame into LineProtocols."""

def __init__(self, data_frame, point_settings, precision=DEFAULT_WRITE_PRECISION, chunk_size: int = None,
**kwargs) -> None:
"""
Init serializer.

:param data_frame: Polars DataFrame to serialize
:param point_settings: Default Tags
:param precision: The precision for the unix timestamps within the body line-protocol.
:param chunk_size: The size of chunk for serializing into chunks.
:key data_frame_measurement_name: name of measurement for writing Polars DataFrame
:key data_frame_tag_columns: list of DataFrame columns which are tags, rest columns will be fields
:key data_frame_timestamp_column: name of DataFrame column which contains a timestamp.
:key data_frame_timestamp_timezone: name of the timezone which is used for timestamp column
"""

self.data_frame = data_frame
self.point_settings = point_settings
self.precision = precision
self.chunk_size = chunk_size
self.measurement_name = kwargs.get("data_frame_measurement_name", "measurement")
self.tag_columns = kwargs.get("data_frame_tag_columns", [])
self.timestamp_column = kwargs.get("data_frame_timestamp_column", None)
self.timestamp_timezone = kwargs.get("data_frame_timestamp_timezone", None)

self.column_indices = {name: index for index, name in enumerate(data_frame.columns)}

if self.timestamp_column is None or self.timestamp_column not in self.column_indices:
raise ValueError(
f"Timestamp column {self.timestamp_column} not found in DataFrame. Please define a valid timestamp "
f"column.")

#
# prepare chunks
#
if chunk_size is not None:
self.number_of_chunks = int(math.ceil(len(data_frame) / float(chunk_size)))
self.chunk_size = chunk_size
else:
self.number_of_chunks = None

def escape_key(self, value):
return str(value).translate(_ESCAPE_KEY)

def escape_value(self, value):
return str(value).translate(_ESCAPE_STRING)

def to_line_protocol(self, row):
# Filter out None or empty values for tags
tags = ""

tags = ",".join(
f'{self.escape_key(col)}={self.escape_key(row[self.column_indices[col]])}'
for col in self.tag_columns
if row[self.column_indices[col]] is not None and row[self.column_indices[col]] != ""
)

if self.point_settings.defaultTags:
default_tags = ",".join(
f'{self.escape_key(key)}={self.escape_key(value)}'
for key, value in self.point_settings.defaultTags.items()
)
# Ensure there's a comma between existing tags and default tags if both are present
if tags and default_tags:
tags += ","
tags += default_tags

# add escape symbols for special characters to tags

fields = ",".join(
f"{col}=\"{self.escape_value(row[self.column_indices[col]])}\"" if isinstance(row[self.column_indices[col]],
str)
else f"{col}={str(row[self.column_indices[col]]).lower()}" if isinstance(row[self.column_indices[col]],
bool) # Check for bool first
else f"{col}={row[self.column_indices[col]]}i" if isinstance(row[self.column_indices[col]], int)
else f"{col}={row[self.column_indices[col]]}"
for col in self.column_indices
if col not in self.tag_columns + [self.timestamp_column] and
row[self.column_indices[col]] is not None and row[self.column_indices[col]] != ""
)

# Access the Unix timestamp
timestamp = row[self.column_indices[self.timestamp_column]]
if tags != "":
line_protocol = f"{self.measurement_name},{tags} {fields} {timestamp}"
else:
line_protocol = f"{self.measurement_name} {fields} {timestamp}"

return line_protocol

def serialize(self, chunk_idx: int = None):
from ...extras import pl

df = self.data_frame

# Check if the timestamp column is already an integer
if df[self.timestamp_column].dtype in [pl.Int32, pl.Int64]:
# The timestamp column is already an integer, assuming it's in Unix format
pass
else:
# Convert timestamp to Unix timestamp based on specified precision
if self.precision in [None, 'ns']:
df = df.with_columns(
pl.col(self.timestamp_column).dt.epoch(time_unit="ns").alias(self.timestamp_column))
elif self.precision == 'us':
df = df.with_columns(
pl.col(self.timestamp_column).dt.epoch(time_unit="us").alias(self.timestamp_column))
elif self.precision == 'ms':
df = df.with_columns(
pl.col(self.timestamp_column).dt.epoch(time_unit="ms").alias(self.timestamp_column))
elif self.precision == 's':
df = df.with_columns(pl.col(self.timestamp_column).dt.epoch(time_unit="s").alias(self.timestamp_column))
else:
raise ValueError(f"Unsupported precision: {self.precision}")

if chunk_idx is None:
chunk = df
else:
logger.debug("Serialize chunk %s/%s ...", chunk_idx + 1, self.number_of_chunks)
chunk = df[chunk_idx * self.chunk_size:(chunk_idx + 1) * self.chunk_size]

# Apply the UDF to each row
line_protocol_expr = chunk.apply(self.to_line_protocol, return_dtype=pl.Object)

lp = line_protocol_expr['map'].to_list()

return lp


def data_frame_to_list_of_points(data_frame, point_settings, precision=DEFAULT_WRITE_PRECISION, **kwargs):
"""
Serialize DataFrame into LineProtocols.
Expand All @@ -430,20 +299,3 @@ def data_frame_to_list_of_points(data_frame, point_settings, precision=DEFAULT_W
:key data_frame_timestamp_timezone: name of the timezone which is used for timestamp column - ``DataFrame``
""" # noqa: E501
return DataframeSerializer(data_frame, point_settings, precision, **kwargs).serialize()


def polars_data_frame_to_list_of_points(data_frame, point_settings, precision=DEFAULT_WRITE_PRECISION, **kwargs):
"""
Serialize DataFrame into LineProtocols.

:param data_frame: Pandas DataFrame to serialize
:param point_settings: Default Tags
:param precision: The precision for the unix timestamps within the body line-protocol.
:key data_frame_measurement_name: name of measurement for writing Pandas DataFrame
:key data_frame_tag_columns: list of DataFrame columns which are tags, rest columns will be fields
:key data_frame_timestamp_column: name of DataFrame column which contains a timestamp. The column can be defined as a :class:`~str` value
formatted as `2018-10-26`, `2018-10-26 12:00`, `2018-10-26 12:00:00-05:00`
or other formats and types supported by `pandas.to_datetime <https://pandas.pydata.org/pandas-docs/stable/reference/api/pandas.to_datetime.html#pandas.to_datetime>`_ - ``DataFrame``
:key data_frame_timestamp_timezone: name of the timezone which is used for timestamp column - ``DataFrame``
""" # noqa: E501
return PolarsDataframeSerializer(data_frame, point_settings, precision, **kwargs).serialize()
Original file line number Diff line number Diff line change
@@ -0,0 +1,160 @@
"""
Functions for serialize Polars DataFrame.

Much of the code here is inspired by that in the aioinflux packet found here: https://github.com/gusutabopb/aioinflux
"""

import logging
import math

from influxdb_client_3.write_client.client.write.point import _ESCAPE_KEY, _ESCAPE_STRING, DEFAULT_WRITE_PRECISION

logger = logging.getLogger('influxdb_client.client.write.polars_dataframe_serializer')


class PolarsDataframeSerializer:
"""Serialize DataFrame into LineProtocols."""

def __init__(self, data_frame, point_settings, precision=DEFAULT_WRITE_PRECISION, chunk_size: int = None,
**kwargs) -> None:
"""
Init serializer.

:param data_frame: Polars DataFrame to serialize
:param point_settings: Default Tags
:param precision: The precision for the unix timestamps within the body line-protocol.
:param chunk_size: The size of chunk for serializing into chunks.
:key data_frame_measurement_name: name of measurement for writing Polars DataFrame
:key data_frame_tag_columns: list of DataFrame columns which are tags, rest columns will be fields
:key data_frame_timestamp_column: name of DataFrame column which contains a timestamp.
:key data_frame_timestamp_timezone: name of the timezone which is used for timestamp column
"""

self.data_frame = data_frame
self.point_settings = point_settings
self.precision = precision
self.chunk_size = chunk_size
self.measurement_name = kwargs.get("data_frame_measurement_name", "measurement")
self.tag_columns = kwargs.get("data_frame_tag_columns", [])
self.timestamp_column = kwargs.get("data_frame_timestamp_column", None)
self.timestamp_timezone = kwargs.get("data_frame_timestamp_timezone", None)

self.column_indices = {name: index for index, name in enumerate(data_frame.columns)}

if self.timestamp_column is None or self.timestamp_column not in self.column_indices:
raise ValueError(

Check warning on line 45 in influxdb_client_3/write_client/client/write/polars_dataframe_serializer.py

View check run for this annotation

Codecov / codecov/patch

influxdb_client_3/write_client/client/write/polars_dataframe_serializer.py#L45

Added line #L45 was not covered by tests
f"Timestamp column {self.timestamp_column} not found in DataFrame. Please define a valid timestamp "
f"column.")

#
# prepare chunks
#
if chunk_size is not None:
self.number_of_chunks = int(math.ceil(len(data_frame) / float(chunk_size)))
self.chunk_size = chunk_size

Check warning on line 54 in influxdb_client_3/write_client/client/write/polars_dataframe_serializer.py

View check run for this annotation

Codecov / codecov/patch

influxdb_client_3/write_client/client/write/polars_dataframe_serializer.py#L53-L54

Added lines #L53 - L54 were not covered by tests
else:
self.number_of_chunks = None

def escape_key(self, value):
return str(value).translate(_ESCAPE_KEY)

def escape_value(self, value):
return str(value).translate(_ESCAPE_STRING)

def to_line_protocol(self, row):
# Filter out None or empty values for tags
tags = ""

tags = ",".join(
f'{self.escape_key(col)}={self.escape_key(row[self.column_indices[col]])}'
for col in self.tag_columns
if row[self.column_indices[col]] is not None and row[self.column_indices[col]] != ""
)

if self.point_settings.defaultTags:
default_tags = ",".join(

Check warning on line 75 in influxdb_client_3/write_client/client/write/polars_dataframe_serializer.py

View check run for this annotation

Codecov / codecov/patch

influxdb_client_3/write_client/client/write/polars_dataframe_serializer.py#L75

Added line #L75 was not covered by tests
f'{self.escape_key(key)}={self.escape_key(value)}'
for key, value in self.point_settings.defaultTags.items()
)
# Ensure there's a comma between existing tags and default tags if both are present
if tags and default_tags:
tags += ","
tags += default_tags

Check warning on line 82 in influxdb_client_3/write_client/client/write/polars_dataframe_serializer.py

View check run for this annotation

Codecov / codecov/patch

influxdb_client_3/write_client/client/write/polars_dataframe_serializer.py#L80-L82

Added lines #L80 - L82 were not covered by tests

# add escape symbols for special characters to tags

fields = ",".join(
f"{col}=\"{self.escape_value(row[self.column_indices[col]])}\"" if isinstance(row[self.column_indices[col]],
str)
else f"{col}={str(row[self.column_indices[col]]).lower()}" if isinstance(row[self.column_indices[col]],
bool) # Check for bool first
else f"{col}={row[self.column_indices[col]]}i" if isinstance(row[self.column_indices[col]], int)
else f"{col}={row[self.column_indices[col]]}"
for col in self.column_indices
if col not in self.tag_columns + [self.timestamp_column] and
row[self.column_indices[col]] is not None and row[self.column_indices[col]] != ""
)

# Access the Unix timestamp
timestamp = row[self.column_indices[self.timestamp_column]]
if tags != "":
line_protocol = f"{self.measurement_name},{tags} {fields} {timestamp}"
else:
line_protocol = f"{self.measurement_name} {fields} {timestamp}"

Check warning on line 103 in influxdb_client_3/write_client/client/write/polars_dataframe_serializer.py

View check run for this annotation

Codecov / codecov/patch

influxdb_client_3/write_client/client/write/polars_dataframe_serializer.py#L103

Added line #L103 was not covered by tests

return line_protocol

def serialize(self, chunk_idx: int = None):
import polars as pl

df = self.data_frame

# Check if the timestamp column is already an integer
if df[self.timestamp_column].dtype in [pl.Int32, pl.Int64]:
# The timestamp column is already an integer, assuming it's in Unix format
pass

Check warning on line 115 in influxdb_client_3/write_client/client/write/polars_dataframe_serializer.py

View check run for this annotation

Codecov / codecov/patch

influxdb_client_3/write_client/client/write/polars_dataframe_serializer.py#L115

Added line #L115 was not covered by tests
else:
# Convert timestamp to Unix timestamp based on specified precision
if self.precision in [None, 'ns']:
df = df.with_columns(
pl.col(self.timestamp_column).dt.epoch(time_unit="ns").alias(self.timestamp_column))
elif self.precision == 'us':
df = df.with_columns(

Check warning on line 122 in influxdb_client_3/write_client/client/write/polars_dataframe_serializer.py

View check run for this annotation

Codecov / codecov/patch

influxdb_client_3/write_client/client/write/polars_dataframe_serializer.py#L121-L122

Added lines #L121 - L122 were not covered by tests
pl.col(self.timestamp_column).dt.epoch(time_unit="us").alias(self.timestamp_column))
elif self.precision == 'ms':
df = df.with_columns(

Check warning on line 125 in influxdb_client_3/write_client/client/write/polars_dataframe_serializer.py

View check run for this annotation

Codecov / codecov/patch

influxdb_client_3/write_client/client/write/polars_dataframe_serializer.py#L124-L125

Added lines #L124 - L125 were not covered by tests
pl.col(self.timestamp_column).dt.epoch(time_unit="ms").alias(self.timestamp_column))
elif self.precision == 's':
df = df.with_columns(pl.col(self.timestamp_column).dt.epoch(time_unit="s").alias(self.timestamp_column))

Check warning on line 128 in influxdb_client_3/write_client/client/write/polars_dataframe_serializer.py

View check run for this annotation

Codecov / codecov/patch

influxdb_client_3/write_client/client/write/polars_dataframe_serializer.py#L127-L128

Added lines #L127 - L128 were not covered by tests
else:
raise ValueError(f"Unsupported precision: {self.precision}")

Check warning on line 130 in influxdb_client_3/write_client/client/write/polars_dataframe_serializer.py

View check run for this annotation

Codecov / codecov/patch

influxdb_client_3/write_client/client/write/polars_dataframe_serializer.py#L130

Added line #L130 was not covered by tests

if chunk_idx is None:
chunk = df
else:
logger.debug("Serialize chunk %s/%s ...", chunk_idx + 1, self.number_of_chunks)
chunk = df[chunk_idx * self.chunk_size:(chunk_idx + 1) * self.chunk_size]

Check warning on line 136 in influxdb_client_3/write_client/client/write/polars_dataframe_serializer.py

View check run for this annotation

Codecov / codecov/patch

influxdb_client_3/write_client/client/write/polars_dataframe_serializer.py#L135-L136

Added lines #L135 - L136 were not covered by tests

# Apply the UDF to each row
line_protocol_expr = chunk.apply(self.to_line_protocol, return_dtype=pl.Object)

lp = line_protocol_expr['map'].to_list()

return lp


def polars_data_frame_to_list_of_points(data_frame, point_settings, precision=DEFAULT_WRITE_PRECISION, **kwargs):
"""
Serialize DataFrame into LineProtocols.

:param data_frame: Pandas DataFrame to serialize
:param point_settings: Default Tags
:param precision: The precision for the unix timestamps within the body line-protocol.
:key data_frame_measurement_name: name of measurement for writing Pandas DataFrame
:key data_frame_tag_columns: list of DataFrame columns which are tags, rest columns will be fields
:key data_frame_timestamp_column: name of DataFrame column which contains a timestamp. The column can be defined as a :class:`~str` value
formatted as `2018-10-26`, `2018-10-26 12:00`, `2018-10-26 12:00:00-05:00`
or other formats and types supported by `pandas.to_datetime <https://pandas.pydata.org/pandas-docs/stable/reference/api/pandas.to_datetime.html#pandas.to_datetime>`_ - ``DataFrame``
:key data_frame_timestamp_timezone: name of the timezone which is used for timestamp column - ``DataFrame``
""" # noqa: E501
return PolarsDataframeSerializer(data_frame, point_settings, precision, **kwargs).serialize()
4 changes: 2 additions & 2 deletions influxdb_client_3/write_client/client/write_api.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,8 +19,7 @@
from influxdb_client_3.write_client.domain import WritePrecision
from influxdb_client_3.write_client.client._base import _BaseWriteApi, _HAS_DATACLASS
from influxdb_client_3.write_client.client.util.helpers import get_org_query_param
from influxdb_client_3.write_client.client.write.dataframe_serializer import (DataframeSerializer,
PolarsDataframeSerializer)
from influxdb_client_3.write_client.client.write.dataframe_serializer import DataframeSerializer
from influxdb_client_3.write_client.client.write.point import Point, DEFAULT_WRITE_PRECISION
from influxdb_client_3.write_client.client.write.retry import WritesRetry
from influxdb_client_3.write_client.rest import _UTF_8_encoding
Expand Down Expand Up @@ -462,6 +461,7 @@
precision, **kwargs)

elif 'polars' in str(type(data)):
from influxdb_client_3.write_client.client.write.dataframe_serializer import PolarsDataframeSerializer

Check warning on line 464 in influxdb_client_3/write_client/client/write_api.py

View check run for this annotation

Codecov / codecov/patch

influxdb_client_3/write_client/client/write_api.py#L464

Added line #L464 was not covered by tests
serializer = PolarsDataframeSerializer(data,
self._point_settings, precision,
self._write_options.batch_size, **kwargs)
Expand Down
7 changes: 1 addition & 6 deletions influxdb_client_3/write_client/extras.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,9 +10,4 @@
except ModuleNotFoundError as err:
raise ImportError(f"`data_frame` requires numpy which couldn't be imported due: {err}")

try:
import polars as pl
except ModuleNotFoundError as err:
raise ImportError(f"`polars_frame` requires polars which couldn't be imported due: {err}")

__all__ = ['pd', 'np', 'pl']
__all__ = ['pd', 'np']
4 changes: 4 additions & 0 deletions tests/data/iot.csv
Original file line number Diff line number Diff line change
@@ -0,0 +1,4 @@
name,building,temperature,time
iot-devices,5a,72.3,2022-10-01T12:01:00Z
iot-devices,5a,72.1,2022-10-02T12:01:00Z
iot-devices,5a,72.2,2022-10-03T12:01:00Z
Loading
Loading