Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add retries to restore dataframe #408

Merged
1 change: 1 addition & 0 deletions CHANGES.rst
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ Version 3.19.0 (2021-02-XY)
state after the update
* Expose compression type and row group chunk size in Cube interface via optional
parameter of type :class:`~kartothek.serialization.ParquetSerializer`.
* Add retries to :func:`~kartothek.serialization.parquet.ParquetSerializer.restore_dataframe`
NeroCorleone marked this conversation as resolved.
Show resolved Hide resolved

Version 3.18.0 (2021-01-25)
===========================
Expand Down
12 changes: 10 additions & 2 deletions kartothek/serialization/_io_buffer.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,14 @@
_logger = logging.getLogger(__name__)


class BufferReadError(IOError):
"""
Internal kartothek error while attempting to read from buffer
"""

pass


class BlockBuffer(io.BufferedIOBase):
"""
Block-based buffer.
Expand Down Expand Up @@ -111,7 +119,7 @@ def _fetch_blocks(self, block, n):
f"Expected raw read to return {size} bytes, but instead got {len(data)}"
)
_logger.error(err)
raise AssertionError(err)
raise BufferReadError(err)

# fill blocks
for i in range(n):
Expand All @@ -135,7 +143,7 @@ def _ensure_range_loaded(self, start, size):
if size < 0:
msg = f"Expected size >= 0, but got start={start}, size={size}"
_logger.error(msg)
raise AssertionError(msg)
raise BufferReadError(msg)

block = start // self._blocksize
offset = start % self._blocksize
Expand Down
70 changes: 68 additions & 2 deletions kartothek/serialization/_parquet.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,8 @@


import datetime
import logging
import time
from typing import Iterable, Optional

import numpy as np
Expand Down Expand Up @@ -33,8 +35,12 @@
except ImportError:
HAVE_BOTO = False

_logger = logging.getLogger(__name__)


EPOCH_ORDINAL = datetime.date(1970, 1, 1).toordinal()
MAX_NB_RETRIES = 6 # longest retry backoff = BACKOFF_TIME * 2**(MAX_NB_RETRIES - 2)
BACKOFF_TIME = 0.01 # 10 ms


def _empty_table_from_schema(parquet_file):
Expand Down Expand Up @@ -65,6 +71,14 @@ def _reset_dictionary_columns(table, exclude=None):
return table


class ParquetReadError(IOError):
"""
Internal kartothek error while attempting to read Parquet file
"""

pass


class ParquetSerializer(DataFrameSerializer):
_PARQUET_VERSION = "2.0"
type_stable = True
Expand Down Expand Up @@ -98,7 +112,7 @@ def __repr__(self):
)

@staticmethod
def restore_dataframe(
def _restore_dataframe(
store: KeyValueStore,
key: str,
filter_query: Optional[str] = None,
Expand All @@ -107,7 +121,7 @@ def restore_dataframe(
categories: Optional[Iterable[str]] = None,
predicates: Optional[PredicatesType] = None,
date_as_object: bool = False,
):
) -> pd.DataFrame:
check_predicates(predicates)
# If we want to do columnar access we can benefit from partial reads
# otherwise full read en block is the better option.
Expand Down Expand Up @@ -187,6 +201,58 @@ def restore_dataframe(
else:
return df

@classmethod
def restore_dataframe(
cls,
store: KeyValueStore,
key: str,
filter_query: Optional[str] = None,
columns: Optional[Iterable[str]] = None,
predicate_pushdown_to_io: bool = True,
categories: Optional[Iterable[str]] = None,
predicates: Optional[PredicatesType] = None,
date_as_object: bool = False,
) -> pd.DataFrame:
# https://github.com/JDASoftwareGroup/kartothek/issues/407 We have been seeing weird `IOError`s while reading
# Parquet files from Azure Blob Store. These errors have caused long running computations to fail.
# The workaround is to retry the serialization here and gain more stability for long running tasks.
# This code should not live forever, it should be removed once the underlying cause has been resolved.
for nb_retry in range(MAX_NB_RETRIES):
try:
return cls._restore_dataframe(
store=store,
key=key,
filter_query=filter_query,
columns=columns,
predicate_pushdown_to_io=predicate_pushdown_to_io,
categories=categories,
predicates=predicates,
date_as_object=date_as_object,
)
# We only retry OSErrors (note that IOError inherits from OSError), as these kind of errors may benefit
# from retries.
except OSError as err:
raised_error = err
_logger.warning(
msg=(
f"Failed to restore dataframe, attempt {nb_retry + 1} of {MAX_NB_RETRIES} with parameters "
f"key: {key}, filter_query: {filter_query}, columns: {columns}, "
f"predicate_pushdown_to_io: {predicate_pushdown_to_io}, categories: {categories}, "
f"predicates: {predicates}, date_as_object: {date_as_object}."
),
exc_info=True,
)
# we don't sleep when we're done with the last attempt
if nb_retry < (MAX_NB_RETRIES - 1):
time.sleep(BACKOFF_TIME * 2 ** nb_retry)

raise ParquetReadError(
f"Failed to restore dataframe after {MAX_NB_RETRIES} attempts. Parameters: "
f"key: {key}, filter_query: {filter_query}, columns: {columns}, "
f"predicate_pushdown_to_io: {predicate_pushdown_to_io}, categories: {categories}, "
f"date_as_object: {date_as_object}, predicates: {predicates}."
) from raised_error

def store(self, store, key_prefix, df):
key = "{}.parquet".format(key_prefix)
if isinstance(df, pa.Table):
Expand Down
54 changes: 54 additions & 0 deletions tests/serialization/test_parquet.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,8 @@

from kartothek.serialization import DataFrameSerializer, ParquetSerializer
from kartothek.serialization._parquet import (
MAX_NB_RETRIES,
ParquetReadError,
_predicate_accepts,
_reset_dictionary_columns,
)
Expand Down Expand Up @@ -459,3 +461,55 @@ def test_reset_dict_cols(store):
only_a_reset = _reset_dictionary_columns(table, exclude=["colB"]).schema
assert not pa.types.is_dictionary(only_a_reset.field("col").type)
assert pa.types.is_dictionary(only_a_reset.field("colB").type)


def test_retry_on_IOError(monkeypatch, caplog, store):
"""
See https://github.com/JDASoftwareGroup/kartothek/issues/407 :
We are testing a retry-workaround for the above issue here. Once the issue is resolved,
this test and the workaround can be removed.
"""

def patched__restore_dataframe(**kwargs):
NeroCorleone marked this conversation as resolved.
Show resolved Hide resolved
# This kind of exception should be captured by the retry mechanism.
raise IOError()

df = pd.DataFrame({"A": [0, 1, 2, 3]})
monkeypatch.setattr(
ParquetSerializer, "_restore_dataframe", patched__restore_dataframe
)
serializer = ParquetSerializer()
key = serializer.store(store, "key", df)

with pytest.raises(ParquetReadError):
serializer.restore_dataframe(store=store, key=key)

assert len(caplog.records) == MAX_NB_RETRIES
for log_record in caplog.records:
assert "Failed to restore dataframe" in log_record.message


def test_retry_fail_on_other_error(monkeypatch, caplog, store):
"""
See https://github.com/JDASoftwareGroup/kartothek/issues/407 :
We are testing a retry-workaround for the above issue here. Once the issue is resolved,
this test and the workaround can be removed.

We only want to retry on OSErrors (and inherited exceptions) -- all other exceptions should be raised.
"""

def patched__restore_dataframe(**kwargs):
# This should not be retried but raised immediately.
raise ValueError()

df = pd.DataFrame({"A": [0, 1, 2, 3]})
monkeypatch.setattr(
ParquetSerializer, "_restore_dataframe", patched__restore_dataframe
)
serializer = ParquetSerializer()
key = serializer.store(store, "key", df)

with pytest.raises(ValueError):
serializer.restore_dataframe(store=store, key=key)

assert len(caplog.records) == 0