Skip to content

Commit

Permalink
🎉 Python CDK: handle requests.exceptions.ChunkedEncodingError for b…
Browse files Browse the repository at this point in the history
…roken connections (#13260)
  • Loading branch information
davydov-d authored May 27, 2022
1 parent 3d9a972 commit acd1533
Show file tree
Hide file tree
Showing 8 changed files with 35 additions and 25 deletions.
3 changes: 3 additions & 0 deletions airbyte-cdk/python/CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,5 +1,8 @@
# Changelog

## 0.1.60
- Add `requests.exceptions.ChunkedEncodingError` to transient errors so it could be retried

## 0.1.59
- Add `Stream.get_error_display_message()` to retrieve user-friendly messages from exceptions encountered while reading streams.
- Add default error error message retrieval logic for `HTTPStream`s following common API patterns.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,13 @@

from .exceptions import DefaultBackoffException, UserDefinedBackoffException

TRANSIENT_EXCEPTIONS = (DefaultBackoffException, exceptions.ConnectTimeout, exceptions.ReadTimeout, exceptions.ConnectionError)
TRANSIENT_EXCEPTIONS = (
DefaultBackoffException,
exceptions.ConnectTimeout,
exceptions.ReadTimeout,
exceptions.ConnectionError,
exceptions.ChunkedEncodingError,
)

logger = logging.getLogger("airbyte")

Expand Down
2 changes: 1 addition & 1 deletion airbyte-cdk/python/setup.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@

setup(
name="airbyte-cdk",
version="0.1.59",
version="0.1.60",
description="A framework for writing Airbyte Connectors.",
long_description=README,
long_description_content_type="text/markdown",
Expand Down
24 changes: 13 additions & 11 deletions airbyte-cdk/python/unit_tests/sources/streams/http/test_http.py
Original file line number Diff line number Diff line change
Expand Up @@ -248,20 +248,22 @@ def test_raise_on_http_errors_off_non_retryable_4xx(mocker, status_code):
assert response.status_code == status_code


def test_raise_on_http_errors_off_timeout(requests_mock):
stream = AutoFailFalseHttpStream()
requests_mock.register_uri("GET", stream.url_base, exc=requests.exceptions.ConnectTimeout)

with pytest.raises(requests.exceptions.ConnectTimeout):
list(stream.read_records(SyncMode.full_refresh))


def test_raise_on_http_errors_off_connection_error(requests_mock):
@pytest.mark.parametrize(
"error",
(
requests.exceptions.ConnectTimeout,
requests.exceptions.ConnectionError,
requests.exceptions.ChunkedEncodingError,
requests.exceptions.ReadTimeout,
),
)
def test_raise_on_http_errors(mocker, error):
stream = AutoFailFalseHttpStream()
requests_mock.register_uri("GET", stream.url_base, exc=requests.exceptions.ConnectionError)
send_mock = mocker.patch.object(requests.Session, "send", side_effect=error())

with pytest.raises(requests.exceptions.ConnectionError):
with pytest.raises(error):
list(stream.read_records(SyncMode.full_refresh))
assert send_mock.call_count == stream.max_retries + 1


class PostHttpStream(StubBasicReadHttpStream):
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -128,8 +128,8 @@ protected Optional<NamingConventionTransformer> getNameTransformer() {

@Override
protected void assertNamespaceNormalization(final String testCaseId,
final String expectedNormalizedNamespace,
final String actualNormalizedNamespace) {
final String expectedNormalizedNamespace,
final String actualNormalizedNamespace) {
final String message = String.format("Test case %s failed; if this is expected, please override assertNamespaceNormalization", testCaseId);
if (testCaseId.equals("S3A-1")) {
// bigquery allows namespace starting with a number, and prepending underscore
Expand All @@ -155,9 +155,9 @@ protected List<JsonNode> retrieveNormalizedRecords(final TestDestinationEnv test

@Override
protected List<JsonNode> retrieveRecords(final TestDestinationEnv env,
final String streamName,
final String namespace,
final JsonNode streamSchema)
final String streamName,
final String namespace,
final JsonNode streamSchema)
throws Exception {
return retrieveRecordsFromTable(namingResolver.getRawTableName(streamName), namingResolver.getIdentifier(namespace))
.stream()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -111,8 +111,7 @@ class BigQueryDestinationTest {
private static Stream<Arguments> datasetIdResetterProvider() {
// parameterized test with two dataset-id patterns: `dataset_id` and `project-id:dataset_id`
return Stream.of(
Arguments.arguments(new DatasetIdResetter(config -> {
})),
Arguments.arguments(new DatasetIdResetter(config -> {})),
Arguments.arguments(new DatasetIdResetter(
config -> {
final String projectId = config.get(BigQueryConsts.CONFIG_PROJECT_ID).asText();
Expand Down Expand Up @@ -153,9 +152,9 @@ void setup(final TestInfo info) throws IOException {

catalog = new ConfiguredAirbyteCatalog().withStreams(Lists.newArrayList(
CatalogHelpers.createConfiguredAirbyteStream(USERS_STREAM_NAME, datasetId,
io.airbyte.protocol.models.Field.of("name", JsonSchemaType.STRING),
io.airbyte.protocol.models.Field
.of("id", JsonSchemaType.STRING))
io.airbyte.protocol.models.Field.of("name", JsonSchemaType.STRING),
io.airbyte.protocol.models.Field
.of("id", JsonSchemaType.STRING))
.withDestinationSyncMode(DestinationSyncMode.APPEND),
CatalogHelpers.createConfiguredAirbyteStream(TASKS_STREAM_NAME, datasetId, Field.of("goal", JsonSchemaType.STRING))));

Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright (c) 2021 Airbyte, Inc., all rights reserved.
* Copyright (c) 2022 Airbyte, Inc., all rights reserved.
*/

package io.airbyte.integrations.destination.bigquery;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ connectionSpecification:
- TODO
additionalProperties: false
properties:
# 'TODO: This schema defines the configuration required for the source. This usually involves metadata such as database and/or authentication information.':
# 'TODO: This schema defines the configuration required for the source. This usually involves metadata such as database and/or authentication information.':
TODO:
type: string
description: describe me

0 comments on commit acd1533

Please sign in to comment.