Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix: skip infinite values during serialization #99

Merged
merged 10 commits into from
Jul 8, 2024
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
### Bug Fixes

1. [#95](https://github.com/InfluxCommunity/influxdb3-python/pull/95): `Polars` is optional dependency
1. [#99](https://github.com/InfluxCommunity/influxdb3-python/pull/99): Skip infinite values during serialization to line protocol

## 0.6.1 [2024-06-25]

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -132,7 +132,7 @@ def __init__(self, data_frame, point_settings, precision=DEFAULT_WRITE_PRECISION
keys = []
# tags holds a list of tag f-string segments ordered alphabetically by tag key.
tags = []
# fields holds a list of field f-string segments ordered alphebetically by field key
# fields holds a list of field f-string segments ordered alphabetically by field key
fields = []
# field_indexes holds the index into each row of all the fields.
field_indexes = []
Expand Down Expand Up @@ -160,6 +160,11 @@ def __init__(self, data_frame, point_settings, precision=DEFAULT_WRITE_PRECISION
# null_columns has a bool value for each column holding
# whether that column contains any null (NaN or None) values.
null_columns = data_frame.isnull().any()

# inf_columns has a bool value for each column holding
# whether that column contains any Inf values.
inf_columns = data_frame.isin([np.inf, -np.inf]).any()

bednar marked this conversation as resolved.
Show resolved Hide resolved
timestamp_index = 0

# Iterate through the columns building up the expression for each column.
Expand All @@ -175,9 +180,10 @@ def __init__(self, data_frame, point_settings, precision=DEFAULT_WRITE_PRECISION

if key in data_frame_tag_columns:
# This column is a tag column.
if null_columns.iloc[index]:
if null_columns.iloc[index] or inf_columns.iloc[index]:
key_value = f"""{{
'' if {val_format} == '' or pd.isna({val_format}) else
'' if {val_format} == '' or pd.isna({val_format}) or
({inf_columns.iloc[index]} and np.isinf({val_format})) else
f',{key_format}={{str({val_format}).translate(_ESCAPE_STRING)}}'
}}"""
else:
Expand All @@ -199,16 +205,17 @@ def __init__(self, data_frame, point_settings, precision=DEFAULT_WRITE_PRECISION
if (issubclass(value.type, np.integer) or issubclass(value.type, np.floating) or
issubclass(value.type, np.bool_)):
suffix = 'i' if issubclass(value.type, np.integer) else ''
if null_columns.iloc[index]:
if null_columns.iloc[index] or inf_columns.iloc[index]:
field_value = (
f"""{{"" if pd.isna({val_format}) else f"{sep}{key_format}={{{val_format}}}{suffix}"}}"""
f"""{{"" if pd.isna({val_format}) or ({inf_columns.iloc[index]} and np.isinf({val_format})) else
f"{sep}{key_format}={{{val_format}}}{suffix}"}}"""
)
else:
field_value = f'{sep}{key_format}={{{val_format}}}{suffix}'
else:
if null_columns.iloc[index]:
if null_columns.iloc[index] or inf_columns.iloc[index]:
field_value = f"""{{
'' if pd.isna({val_format}) else
'' if pd.isna({val_format}) or ({inf_columns.iloc[index]} and np.isinf({val_format})) else
f'{sep}{key_format}="{{str({val_format}).translate(_ESCAPE_STRING)}}"'
}}"""
else:
Expand All @@ -234,11 +241,12 @@ def __init__(self, data_frame, point_settings, precision=DEFAULT_WRITE_PRECISION
'_ESCAPE_STRING': _ESCAPE_STRING,
'keys': keys,
'pd': pd,
'np': np,
})

for k, v in dict(data_frame.dtypes).items():
if k in data_frame_tag_columns:
data_frame[k].replace('', np.nan, inplace=True)
data_frame[k] = data_frame[k].apply(lambda x: np.nan if x == '' else x)

self.data_frame = data_frame
self.f = f
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -136,7 +136,7 @@ def serialize(self, chunk_idx: int = None):
chunk = df[chunk_idx * self.chunk_size:(chunk_idx + 1) * self.chunk_size]

# Apply the UDF to each row
line_protocol_expr = chunk.apply(self.to_line_protocol, return_dtype=pl.Object)
line_protocol_expr = chunk.map_rows(self.to_line_protocol, return_dtype=pl.Object)

lp = line_protocol_expr['map'].to_list()

Expand Down
Loading
Loading