Skip to content

Commit

Permalink
Retry creating JdbcRecordCursor
Browse files Browse the repository at this point in the history
Creating PreparedStatement can fail due transient issues in remote
database. Let's retry it in the same as in JdbcClient.
  • Loading branch information
kokosing committed Sep 5, 2024
1 parent 39827a8 commit 935f9c7
Show file tree
Hide file tree
Showing 2 changed files with 19 additions and 3 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@
package io.trino.plugin.jdbc;

import com.google.common.collect.ImmutableList;
import dev.failsafe.RetryPolicy;
import io.trino.spi.connector.ConnectorSession;
import io.trino.spi.connector.RecordCursor;
import io.trino.spi.connector.RecordSet;
Expand All @@ -22,6 +23,7 @@
import java.util.List;
import java.util.concurrent.ExecutorService;

import static io.trino.plugin.jdbc.RetryingModule.retry;
import static java.util.Objects.requireNonNull;

public class JdbcRecordSet
Expand All @@ -34,8 +36,16 @@ public class JdbcRecordSet
private final List<Type> columnTypes;
private final JdbcSplit split;
private final ConnectorSession session;
private final RetryPolicy<Object> policy;

public JdbcRecordSet(JdbcClient jdbcClient, ExecutorService executor, ConnectorSession session, JdbcSplit split, BaseJdbcConnectorTableHandle table, List<JdbcColumnHandle> columnHandles)
public JdbcRecordSet(
JdbcClient jdbcClient,
ExecutorService executor,
ConnectorSession session,
RetryPolicy<Object> policy,
JdbcSplit split,
BaseJdbcConnectorTableHandle table,
List<JdbcColumnHandle> columnHandles)
{
this.jdbcClient = requireNonNull(jdbcClient, "jdbcClient is null");
this.executor = requireNonNull(executor, "executor is null");
Expand All @@ -49,6 +59,7 @@ public JdbcRecordSet(JdbcClient jdbcClient, ExecutorService executor, ConnectorS
}
this.columnTypes = types.build();
this.session = requireNonNull(session, "session is null");
this.policy = requireNonNull(policy, "policy is null");
}

@Override
Expand All @@ -60,6 +71,6 @@ public List<Type> getColumnTypes()
@Override
public RecordCursor cursor()
{
return new JdbcRecordCursor(jdbcClient, executor, session, split, table, columnHandles);
return retry(policy, () -> new JdbcRecordCursor(jdbcClient, executor, session, split, table, columnHandles));
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableSet;
import com.google.inject.Inject;
import dev.failsafe.RetryPolicy;
import io.trino.plugin.base.MappedRecordSet;
import io.trino.spi.connector.ColumnHandle;
import io.trino.spi.connector.ConnectorRecordSetProvider;
Expand All @@ -41,12 +42,14 @@ public class JdbcRecordSetProvider
{
private final JdbcClient jdbcClient;
private final ExecutorService executor;
private final RetryPolicy<Object> policy;

@Inject
public JdbcRecordSetProvider(JdbcClient jdbcClient, @ForRecordCursor ExecutorService executor)
public JdbcRecordSetProvider(JdbcClient jdbcClient, @ForRecordCursor ExecutorService executor, RetryPolicy<Object> policy)
{
this.jdbcClient = requireNonNull(jdbcClient, "jdbcClient is null");
this.executor = requireNonNull(executor, "executor is null");
this.policy = requireNonNull(policy, "policy is null");
}

@Override
Expand All @@ -72,6 +75,7 @@ public RecordSet getRecordSet(ConnectorTransactionHandle transaction, ConnectorS
jdbcClient,
executor,
session,
policy,
jdbcSplit,
jdbcTableHandle.intersectedWithConstraint(jdbcSplit.getDynamicFilter().transformKeys(ColumnHandle.class::cast)),
handles.build());
Expand All @@ -88,6 +92,7 @@ public RecordSet getRecordSet(ConnectorTransactionHandle transaction, ConnectorS
jdbcClient,
executor,
session,
policy,
jdbcSplit,
procedureHandle,
sourceColumns),
Expand Down

0 comments on commit 935f9c7

Please sign in to comment.