diff --git a/plugin/trino-base-jdbc/src/main/java/io/trino/plugin/jdbc/CachingJdbcClient.java b/plugin/trino-base-jdbc/src/main/java/io/trino/plugin/jdbc/CachingJdbcClient.java index 44f1a81ecba8d9..a0e8f0f473becf 100644 --- a/plugin/trino-base-jdbc/src/main/java/io/trino/plugin/jdbc/CachingJdbcClient.java +++ b/plugin/trino-base-jdbc/src/main/java/io/trino/plugin/jdbc/CachingJdbcClient.java @@ -93,7 +93,7 @@ public class CachingJdbcClient @Inject public CachingJdbcClient( - @StatsCollecting JdbcClient delegate, + RetryingJdbcClient delegate, Set sessionPropertiesProviders, IdentityCacheMapping identityMapping, BaseJdbcConfig config) diff --git a/plugin/trino-base-jdbc/src/main/java/io/trino/plugin/jdbc/JdbcModule.java b/plugin/trino-base-jdbc/src/main/java/io/trino/plugin/jdbc/JdbcModule.java index bf354ee3517817..84ac535615a56d 100644 --- a/plugin/trino-base-jdbc/src/main/java/io/trino/plugin/jdbc/JdbcModule.java +++ b/plugin/trino-base-jdbc/src/main/java/io/trino/plugin/jdbc/JdbcModule.java @@ -51,7 +51,7 @@ public void setup(Binder binder) install(new JdbcDiagnosticModule()); install(new IdentifierMappingModule()); install(new RemoteQueryModifierModule()); - install(new RetryingConnectionFactoryModule()); + install(new RetryingModule()); newOptionalBinder(binder, ConnectorAccessControl.class); newOptionalBinder(binder, QueryBuilder.class).setDefault().to(DefaultQueryBuilder.class).in(Scopes.SINGLETON); diff --git a/plugin/trino-base-jdbc/src/main/java/io/trino/plugin/jdbc/RetryStrategy.java b/plugin/trino-base-jdbc/src/main/java/io/trino/plugin/jdbc/RetryStrategy.java new file mode 100644 index 00000000000000..183912a9706f2c --- /dev/null +++ b/plugin/trino-base-jdbc/src/main/java/io/trino/plugin/jdbc/RetryStrategy.java @@ -0,0 +1,19 @@ +/* + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.trino.plugin.jdbc; + +public interface RetryStrategy +{ + boolean isExceptionRecoverable(Throwable exception); +} diff --git a/plugin/trino-base-jdbc/src/main/java/io/trino/plugin/jdbc/RetryingConnectionFactory.java b/plugin/trino-base-jdbc/src/main/java/io/trino/plugin/jdbc/RetryingConnectionFactory.java index f2003a7002a8e6..2a8220c856774a 100644 --- a/plugin/trino-base-jdbc/src/main/java/io/trino/plugin/jdbc/RetryingConnectionFactory.java +++ b/plugin/trino-base-jdbc/src/main/java/io/trino/plugin/jdbc/RetryingConnectionFactory.java @@ -13,49 +13,29 @@ */ package io.trino.plugin.jdbc; -import com.google.common.base.Throwables; import com.google.inject.Inject; import dev.failsafe.Failsafe; import dev.failsafe.FailsafeException; import dev.failsafe.RetryPolicy; import io.trino.plugin.jdbc.jmx.StatisticsAwareConnectionFactory; -import io.trino.spi.TrinoException; import io.trino.spi.connector.ConnectorSession; import java.sql.Connection; import java.sql.SQLException; -import java.sql.SQLTransientException; -import java.util.Set; -import static java.time.temporal.ChronoUnit.MILLIS; -import static java.time.temporal.ChronoUnit.SECONDS; import static java.util.Objects.requireNonNull; public class RetryingConnectionFactory implements ConnectionFactory { - private final RetryPolicy retryPolicy; - private final ConnectionFactory delegate; + private final RetryPolicy retryPolicy; @Inject - public RetryingConnectionFactory(StatisticsAwareConnectionFactory delegate, Set retryStrategies) + public RetryingConnectionFactory(StatisticsAwareConnectionFactory delegate, RetryPolicy retryPolicy) { - requireNonNull(retryStrategies); this.delegate = requireNonNull(delegate, "delegate is null"); - this.retryPolicy = RetryPolicy.builder() - .withMaxDuration(java.time.Duration.of(30, SECONDS)) - .withMaxAttempts(5) - .withBackoff(50, 5_000, MILLIS, 4) - .handleIf(throwable -> isExceptionRecoverable(retryStrategies, throwable)) - .abortOn(TrinoException.class) - .build(); - } - - private static boolean isExceptionRecoverable(Set retryStrategies, Throwable throwable) - { - return retryStrategies.stream() - .anyMatch(retryStrategy -> retryStrategy.isExceptionRecoverable(throwable)); + this.retryPolicy = requireNonNull(retryPolicy, "retryPolicy is null"); } @Override @@ -80,20 +60,4 @@ public void close() { delegate.close(); } - - public interface RetryStrategy - { - boolean isExceptionRecoverable(Throwable exception); - } - - public static class DefaultRetryStrategy - implements RetryStrategy - { - @Override - public boolean isExceptionRecoverable(Throwable exception) - { - return Throwables.getCausalChain(exception).stream() - .anyMatch(SQLTransientException.class::isInstance); - } - } } diff --git a/plugin/trino-base-jdbc/src/main/java/io/trino/plugin/jdbc/RetryingConnectionFactoryModule.java b/plugin/trino-base-jdbc/src/main/java/io/trino/plugin/jdbc/RetryingConnectionFactoryModule.java deleted file mode 100644 index bc047b5386ea80..00000000000000 --- a/plugin/trino-base-jdbc/src/main/java/io/trino/plugin/jdbc/RetryingConnectionFactoryModule.java +++ /dev/null @@ -1,32 +0,0 @@ -/* - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package io.trino.plugin.jdbc; - -import com.google.inject.AbstractModule; -import com.google.inject.Scopes; -import io.trino.plugin.jdbc.RetryingConnectionFactory.DefaultRetryStrategy; -import io.trino.plugin.jdbc.RetryingConnectionFactory.RetryStrategy; - -import static com.google.inject.multibindings.Multibinder.newSetBinder; - -public class RetryingConnectionFactoryModule - extends AbstractModule -{ - @Override - public void configure() - { - bind(RetryingConnectionFactory.class).in(Scopes.SINGLETON); - newSetBinder(binder(), RetryStrategy.class).addBinding().to(DefaultRetryStrategy.class).in(Scopes.SINGLETON); - } -} diff --git a/plugin/trino-base-jdbc/src/main/java/io/trino/plugin/jdbc/RetryingJdbcClient.java b/plugin/trino-base-jdbc/src/main/java/io/trino/plugin/jdbc/RetryingJdbcClient.java new file mode 100644 index 00000000000000..240ac46bae9239 --- /dev/null +++ b/plugin/trino-base-jdbc/src/main/java/io/trino/plugin/jdbc/RetryingJdbcClient.java @@ -0,0 +1,527 @@ +/* + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.trino.plugin.jdbc; + +import com.google.inject.Inject; +import dev.failsafe.RetryPolicy; +import io.trino.plugin.jdbc.JdbcProcedureHandle.ProcedureQuery; +import io.trino.plugin.jdbc.expression.ParameterizedExpression; +import io.trino.spi.connector.AggregateFunction; +import io.trino.spi.connector.ColumnHandle; +import io.trino.spi.connector.ColumnMetadata; +import io.trino.spi.connector.ConnectorSession; +import io.trino.spi.connector.ConnectorSplitSource; +import io.trino.spi.connector.ConnectorTableMetadata; +import io.trino.spi.connector.JoinStatistics; +import io.trino.spi.connector.JoinType; +import io.trino.spi.connector.RelationColumnsMetadata; +import io.trino.spi.connector.RelationCommentMetadata; +import io.trino.spi.connector.SchemaTableName; +import io.trino.spi.connector.SystemTable; +import io.trino.spi.connector.TableScanRedirectApplicationResult; +import io.trino.spi.expression.ConnectorExpression; +import io.trino.spi.statistics.TableStatistics; +import io.trino.spi.type.Type; + +import java.sql.CallableStatement; +import java.sql.Connection; +import java.sql.PreparedStatement; +import java.sql.ResultSet; +import java.sql.SQLException; +import java.util.Iterator; +import java.util.List; +import java.util.Map; +import java.util.Optional; +import java.util.OptionalInt; +import java.util.OptionalLong; +import java.util.Set; + +import static io.trino.plugin.jdbc.RetryingModule.retry; +import static java.util.Objects.requireNonNull; + +public class RetryingJdbcClient + implements JdbcClient +{ + private final JdbcClient delegate; + private final RetryPolicy policy; + + @Inject + public RetryingJdbcClient(@StatsCollecting JdbcClient delegate, RetryPolicy policy) + { + this.delegate = requireNonNull(delegate, "delegate is null"); + this.policy = requireNonNull(policy, "policy is null"); + } + + @Override + public boolean schemaExists(ConnectorSession session, String schema) + { + return retry(policy, () -> delegate.schemaExists(session, schema)); + } + + @Override + public Set getSchemaNames(ConnectorSession session) + { + return retry(policy, () -> delegate.getSchemaNames(session)); + } + + @Override + public List getTableNames(ConnectorSession session, Optional schema) + { + return retry(policy, () -> delegate.getTableNames(session, schema)); + } + + @Override + public Optional getTableHandle(ConnectorSession session, SchemaTableName schemaTableName) + { + return retry(policy, () -> delegate.getTableHandle(session, schemaTableName)); + } + + @Override + public JdbcTableHandle getTableHandle(ConnectorSession session, PreparedQuery preparedQuery) + { + return retry(policy, () -> delegate.getTableHandle(session, preparedQuery)); + } + + @Override + public JdbcProcedureHandle getProcedureHandle(ConnectorSession session, ProcedureQuery procedureQuery) + { + return retry(policy, () -> delegate.getProcedureHandle(session, procedureQuery)); + } + + @Override + public List getColumns(ConnectorSession session, JdbcTableHandle tableHandle) + { + return retry(policy, () -> delegate.getColumns(session, tableHandle)); + } + + @Override + public Iterator getAllTableColumns(ConnectorSession session, Optional schema) + { + return retry(policy, () -> delegate.getAllTableColumns(session, schema)); + } + + @Override + public List getAllTableComments(ConnectorSession session, Optional schema) + { + return retry(policy, () -> delegate.getAllTableComments(session, schema)); + } + + @Override + public Optional toColumnMapping(ConnectorSession session, Connection connection, JdbcTypeHandle typeHandle) + { + // no retrying as it could be not idempotent operation (connection could be not reusable after the first failure) + return delegate.toColumnMapping(session, connection, typeHandle); + } + + @Override + public List toColumnMappings(ConnectorSession session, List typeHandles) + { + // there should be no remote database interaction + return delegate.toColumnMappings(session, typeHandles); + } + + @Override + public WriteMapping toWriteMapping(ConnectorSession session, Type type) + { + // there should be no remote database interaction + return delegate.toWriteMapping(session, type); + } + + @Override + public Optional getSupportedType(ConnectorSession session, Type type) + { + // there should be no remote database interaction + return delegate.getSupportedType(session, type); + } + + @Override + public boolean supportsAggregationPushdown(ConnectorSession session, JdbcTableHandle table, List aggregates, Map assignments, List> groupingSets) + { + // there should be no remote database interaction + return delegate.supportsAggregationPushdown(session, table, aggregates, assignments, groupingSets); + } + + @Override + public Optional implementAggregation(ConnectorSession session, AggregateFunction aggregate, Map assignments) + { + // there should be no remote database interaction + return delegate.implementAggregation(session, aggregate, assignments); + } + + @Override + public Optional convertPredicate(ConnectorSession session, ConnectorExpression expression, Map assignments) + { + // there should be no remote database interaction + return delegate.convertPredicate(session, expression, assignments); + } + + @Override + public Optional convertProjection(ConnectorSession session, JdbcTableHandle handle, ConnectorExpression expression, Map assignments) + { + // there should be no remote database interaction + return delegate.convertProjection(session, handle, expression, assignments); + } + + @Override + public ConnectorSplitSource getSplits(ConnectorSession session, JdbcTableHandle tableHandle) + { + return retry(policy, () -> delegate.getSplits(session, tableHandle)); + } + + @Override + public ConnectorSplitSource getSplits(ConnectorSession session, JdbcProcedureHandle procedureHandle) + { + return retry(policy, () -> delegate.getSplits(session, procedureHandle)); + } + + @Override + public Connection getConnection(ConnectorSession session, JdbcSplit split, JdbcTableHandle tableHandle) + throws SQLException + { + // retry already implemented by RetryingConnectionFactory + return delegate.getConnection(session, split, tableHandle); + } + + @Override + public Connection getConnection(ConnectorSession session, JdbcSplit split, JdbcProcedureHandle procedureHandle) + throws SQLException + { + // retry already implemented by RetryingConnectionFactory + return delegate.getConnection(session, split, procedureHandle); + } + + @Override + public void abortReadConnection(Connection connection, ResultSet resultSet) + throws SQLException + { + // no retrying as it could be not idempotent operation (connection could be not reusable after the first failure) + delegate.abortReadConnection(connection, resultSet); + } + + @Override + public PreparedQuery prepareQuery(ConnectorSession session, JdbcTableHandle table, Optional>> groupingSets, List columns, Map columnExpressions) + { + // there should be no remote database interaction + return delegate.prepareQuery(session, table, groupingSets, columns, columnExpressions); + } + + @Override + public PreparedStatement buildSql(ConnectorSession session, Connection connection, JdbcSplit split, JdbcTableHandle table, List columns) + throws SQLException + { + // no retrying as it could be not idempotent operation (connection could be not reusable after the first failure) + return delegate.buildSql(session, connection, split, table, columns); + } + + @Override + public CallableStatement buildProcedure(ConnectorSession session, Connection connection, JdbcSplit split, JdbcProcedureHandle procedureHandle) + throws SQLException + { + // no retrying as it could be not idempotent operation (connection could be not reusable after the first failure) + return delegate.buildProcedure(session, connection, split, procedureHandle); + } + + @Override + public Optional implementJoin(ConnectorSession session, JoinType joinType, PreparedQuery leftSource, Map leftProjections, PreparedQuery rightSource, Map rightProjections, List joinConditions, JoinStatistics statistics) + { + // there should be no remote database interaction + return delegate.implementJoin(session, joinType, leftSource, leftProjections, rightSource, rightProjections, joinConditions, statistics); + } + + @Override + public Optional legacyImplementJoin(ConnectorSession session, JoinType joinType, PreparedQuery leftSource, PreparedQuery rightSource, List joinConditions, Map rightAssignments, Map leftAssignments, JoinStatistics statistics) + { + // there should be no remote database interaction + return delegate.legacyImplementJoin(session, joinType, leftSource, rightSource, joinConditions, rightAssignments, leftAssignments, statistics); + } + + @Override + public boolean supportsTopN(ConnectorSession session, JdbcTableHandle handle, List sortOrder) + { + // there should be no remote database interaction + return delegate.supportsTopN(session, handle, sortOrder); + } + + @Override + public boolean isTopNGuaranteed(ConnectorSession session) + { + // there should be no remote database interaction + return delegate.isTopNGuaranteed(session); + } + + @Override + public boolean supportsLimit() + { + // there should be no remote database interaction + return delegate.supportsLimit(); + } + + @Override + public boolean isLimitGuaranteed(ConnectorSession session) + { + // there should be no remote database interaction + return delegate.isLimitGuaranteed(session); + } + + @Override + public Optional getTableComment(ResultSet resultSet) + throws SQLException + { + // no retrying as it could be not idempotent operation + return delegate.getTableComment(resultSet); + } + + @Override + public void setTableComment(ConnectorSession session, JdbcTableHandle handle, Optional comment) + { + // no retrying as it could be not idempotent operation + delegate.setTableComment(session, handle, comment); + } + + @Override + public void setColumnComment(ConnectorSession session, JdbcTableHandle handle, JdbcColumnHandle column, Optional comment) + { + // no retrying as it could be not idempotent operation + delegate.setColumnComment(session, handle, column, comment); + } + + @Override + public void addColumn(ConnectorSession session, JdbcTableHandle handle, ColumnMetadata column) + { + // no retrying as it could be not idempotent operation + delegate.addColumn(session, handle, column); + } + + @Override + public void dropColumn(ConnectorSession session, JdbcTableHandle handle, JdbcColumnHandle column) + { + // no retrying as it could be not idempotent operation + delegate.dropColumn(session, handle, column); + } + + @Override + public void renameColumn(ConnectorSession session, JdbcTableHandle handle, JdbcColumnHandle jdbcColumn, String newColumnName) + { + // no retrying as it could be not idempotent operation + delegate.renameColumn(session, handle, jdbcColumn, newColumnName); + } + + @Override + public void setColumnType(ConnectorSession session, JdbcTableHandle handle, JdbcColumnHandle column, Type type) + { + // no retrying as it could be not idempotent operation + delegate.setColumnType(session, handle, column, type); + } + + @Override + public void dropNotNullConstraint(ConnectorSession session, JdbcTableHandle handle, JdbcColumnHandle column) + { + // no retrying as it could be not idempotent operation + delegate.dropNotNullConstraint(session, handle, column); + } + + @Override + public void renameTable(ConnectorSession session, JdbcTableHandle handle, SchemaTableName newTableName) + { + // no retrying as it could be not idempotent operation + delegate.renameTable(session, handle, newTableName); + } + + @Override + public void setTableProperties(ConnectorSession session, JdbcTableHandle handle, Map> properties) + { + // no retrying as it could be not idempotent operation + delegate.setTableProperties(session, handle, properties); + } + + @Override + public void createTable(ConnectorSession session, ConnectorTableMetadata tableMetadata) + { + // no retrying as it could be not idempotent operation + delegate.createTable(session, tableMetadata); + } + + @Override + public JdbcOutputTableHandle beginCreateTable(ConnectorSession session, ConnectorTableMetadata tableMetadata) + { + // no retrying as it could be not idempotent operation + return delegate.beginCreateTable(session, tableMetadata); + } + + @Override + public void commitCreateTable(ConnectorSession session, JdbcOutputTableHandle handle, Set pageSinkIds) + { + // no retrying as it could be not idempotent operation + delegate.commitCreateTable(session, handle, pageSinkIds); + } + + @Override + public JdbcOutputTableHandle beginInsertTable(ConnectorSession session, JdbcTableHandle tableHandle, List columns) + { + // no retrying as it could be not idempotent operation + return delegate.beginInsertTable(session, tableHandle, columns); + } + + @Override + public void finishInsertTable(ConnectorSession session, JdbcOutputTableHandle handle, Set pageSinkIds) + { + // no retrying as it could be not idempotent operation + delegate.finishInsertTable(session, handle, pageSinkIds); + } + + @Override + public void dropTable(ConnectorSession session, JdbcTableHandle jdbcTableHandle) + { + // no retrying as it could be not idempotent operation + delegate.dropTable(session, jdbcTableHandle); + } + + @Override + public void rollbackCreateTable(ConnectorSession session, JdbcOutputTableHandle handle) + { + // no retrying as it could be not idempotent operation + delegate.rollbackCreateTable(session, handle); + } + + @Override + public boolean supportsRetries() + { + // there should be no remote database interaction + return delegate.supportsRetries(); + } + + @Override + public String buildInsertSql(JdbcOutputTableHandle handle, List columnWriters) + { + // there should be no remote database interaction + return delegate.buildInsertSql(handle, columnWriters); + } + + @Override + public Connection getConnection(ConnectorSession session) + throws SQLException + { + // retry already implemented by RetryingConnectionFactory + return delegate.getConnection(session); + } + + @Override + public Connection getConnection(ConnectorSession session, JdbcOutputTableHandle handle) + throws SQLException + { + // retry already implemented by RetryingConnectionFactory + return delegate.getConnection(session); + } + + @Override + public PreparedStatement getPreparedStatement(Connection connection, String sql, Optional columnCount) + throws SQLException + { + // no retrying as it could be not idempotent operation (connection could be not reusable after the first failure) + return delegate.getPreparedStatement(connection, sql, columnCount); + } + + @Override + public TableStatistics getTableStatistics(ConnectorSession session, JdbcTableHandle handle) + { + return retry(policy, () -> delegate.getTableStatistics(session, handle)); + } + + @Override + public void createSchema(ConnectorSession session, String schemaName) + { + // no retrying as it could be not idempotent operation + delegate.createSchema(session, schemaName); + } + + @Override + public void dropSchema(ConnectorSession session, String schemaName, boolean cascade) + { + // no retrying as it could be not idempotent operation + delegate.dropSchema(session, schemaName, cascade); + } + + @Override + public void renameSchema(ConnectorSession session, String schemaName, String newSchemaName) + { + // no retrying as it could be not idempotent operation + delegate.renameSchema(session, schemaName, newSchemaName); + } + + @Override + public Optional getSystemTable(ConnectorSession session, SchemaTableName tableName) + { + // there should be no remote database interaction + return delegate.getSystemTable(session, tableName); + } + + @Override + public String quoted(String name) + { + // there should be no remote database interaction + return delegate.quoted(name); + } + + @Override + public String quoted(RemoteTableName remoteTableName) + { + // there should be no remote database interaction + return delegate.quoted(remoteTableName); + } + + @Override + public Map getTableProperties(ConnectorSession session, JdbcTableHandle tableHandle) + { + return retry(policy, () -> delegate.getTableProperties(session, tableHandle)); + } + + @Override + public Optional getTableScanRedirection(ConnectorSession session, JdbcTableHandle tableHandle) + { + return retry(policy, () -> delegate.getTableScanRedirection(session, tableHandle)); + } + + @Override + public OptionalLong delete(ConnectorSession session, JdbcTableHandle handle) + { + // no retrying as it could be not idempotent operation + return delegate.delete(session, handle); + } + + @Override + public void truncateTable(ConnectorSession session, JdbcTableHandle handle) + { + // no retrying as it could be not idempotent operation + delegate.truncateTable(session, handle); + } + + @Override + public OptionalLong update(ConnectorSession session, JdbcTableHandle handle) + { + // no retrying as it could be not idempotent operation + return delegate.update(session, handle); + } + + @Override + public OptionalInt getMaxWriteParallelism(ConnectorSession session) + { + return retry(policy, () -> delegate.getMaxWriteParallelism(session)); + } + + @Override + public OptionalInt getMaxColumnNameLength(ConnectorSession session) + { + return retry(policy, () -> delegate.getMaxColumnNameLength(session)); + } +} diff --git a/plugin/trino-base-jdbc/src/main/java/io/trino/plugin/jdbc/RetryingModule.java b/plugin/trino-base-jdbc/src/main/java/io/trino/plugin/jdbc/RetryingModule.java new file mode 100644 index 00000000000000..4b66b93ef77844 --- /dev/null +++ b/plugin/trino-base-jdbc/src/main/java/io/trino/plugin/jdbc/RetryingModule.java @@ -0,0 +1,88 @@ +/* + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.trino.plugin.jdbc; + +import com.google.common.base.Throwables; +import com.google.inject.AbstractModule; +import com.google.inject.Provides; +import com.google.inject.Scopes; +import com.google.inject.Singleton; +import dev.failsafe.Failsafe; +import dev.failsafe.FailsafeException; +import dev.failsafe.RetryPolicy; +import dev.failsafe.function.CheckedSupplier; +import io.trino.spi.TrinoException; + +import java.sql.SQLTransientException; +import java.util.Set; + +import static com.google.inject.multibindings.Multibinder.newSetBinder; +import static java.time.temporal.ChronoUnit.MILLIS; +import static java.time.temporal.ChronoUnit.SECONDS; + +public class RetryingModule + extends AbstractModule +{ + @Override + public void configure() + { + bind(RetryingConnectionFactory.class).in(Scopes.SINGLETON); + bind(RetryingJdbcClient.class).in(Scopes.SINGLETON); + newSetBinder(binder(), RetryStrategy.class).addBinding().to(RetryOnSqlTransientException.class).in(Scopes.SINGLETON); + } + + @Provides + @Singleton + public RetryPolicy createRetryPolicy(Set retryStrategies) + { + return RetryPolicy.builder() + .withMaxDuration(java.time.Duration.of(30, SECONDS)) + .withMaxAttempts(5) + .withBackoff(50, 5_000, MILLIS, 4) + .handleIf(throwable -> isExceptionRecoverable(retryStrategies, throwable)) + .abortOn(TrinoException.class) + .build(); + } + + public static T retry(RetryPolicy policy, CheckedSupplier supplier) + { + try { + return Failsafe.with(policy) + .get(supplier); + } + catch (FailsafeException ex) { + if (ex.getCause() instanceof TrinoException) { + throw (TrinoException) ex.getCause(); + } + throw ex; + } + } + + private static boolean isExceptionRecoverable(Set retryStrategies, Throwable throwable) + { + return retryStrategies.stream() + .anyMatch(retryStrategy -> retryStrategy.isExceptionRecoverable(throwable)); + } + + private static class RetryOnSqlTransientException + implements RetryStrategy + { + @Override + public boolean isExceptionRecoverable(Throwable exception) + { + return Throwables.getCausalChain(exception).stream() + .anyMatch(SQLTransientException.class::isInstance); + } + } +} diff --git a/plugin/trino-base-jdbc/src/test/java/io/trino/plugin/jdbc/TestLazyConnectionFactory.java b/plugin/trino-base-jdbc/src/test/java/io/trino/plugin/jdbc/TestLazyConnectionFactory.java index 797faadd65b357..508d45b559c8a0 100644 --- a/plugin/trino-base-jdbc/src/test/java/io/trino/plugin/jdbc/TestLazyConnectionFactory.java +++ b/plugin/trino-base-jdbc/src/test/java/io/trino/plugin/jdbc/TestLazyConnectionFactory.java @@ -37,7 +37,7 @@ public void testNoConnectionIsCreated() session -> { throw new AssertionError("Expected no connection creation"); }); - binder.install(new RetryingConnectionFactoryModule()); + binder.install(new RetryingModule()); }); try (LazyConnectionFactory lazyConnectionFactory = injector.getInstance(LazyConnectionFactory.class); @@ -56,7 +56,7 @@ public void testConnectionCannotBeReusedAfterClose() Injector injector = Guice.createInjector(binder -> { binder.bind(ConnectionFactory.class).annotatedWith(ForBaseJdbc.class).toInstance( DriverConnectionFactory.builder(new Driver(), config.getConnectionUrl(), new EmptyCredentialProvider()).build()); - binder.install(new RetryingConnectionFactoryModule()); + binder.install(new RetryingModule()); }); try (LazyConnectionFactory lazyConnectionFactory = injector.getInstance(LazyConnectionFactory.class)) { diff --git a/plugin/trino-base-jdbc/src/test/java/io/trino/plugin/jdbc/TestRetryingConnectionFactory.java b/plugin/trino-base-jdbc/src/test/java/io/trino/plugin/jdbc/TestRetryingConnectionFactory.java index 95e83c5d0a0813..59063ca6c5f9d9 100644 --- a/plugin/trino-base-jdbc/src/test/java/io/trino/plugin/jdbc/TestRetryingConnectionFactory.java +++ b/plugin/trino-base-jdbc/src/test/java/io/trino/plugin/jdbc/TestRetryingConnectionFactory.java @@ -19,7 +19,6 @@ import com.google.inject.Injector; import com.google.inject.Key; import com.google.inject.Scopes; -import io.trino.plugin.jdbc.RetryingConnectionFactory.RetryStrategy; import io.trino.spi.StandardErrorCode; import io.trino.spi.TrinoException; import io.trino.spi.connector.ConnectorSession; @@ -160,7 +159,7 @@ private static Injector createInjector(MockConnectorFactory.Action... actions) binder.bind(MockConnectorFactory.Action[].class).toInstance(actions); binder.bind(MockConnectorFactory.class).in(Scopes.SINGLETON); binder.bind(ConnectionFactory.class).annotatedWith(ForBaseJdbc.class).to(Key.get(MockConnectorFactory.class)); - binder.install(new RetryingConnectionFactoryModule()); + binder.install(new RetryingModule()); }); } @@ -170,7 +169,7 @@ private static Injector createInjectorWithAdditionalStrategy(MockConnectorFactor binder.bind(MockConnectorFactory.Action[].class).toInstance(actions); binder.bind(MockConnectorFactory.class).in(Scopes.SINGLETON); binder.bind(ConnectionFactory.class).annotatedWith(ForBaseJdbc.class).to(Key.get(MockConnectorFactory.class)); - binder.install(new RetryingConnectionFactoryModule()); + binder.install(new RetryingModule()); newSetBinder(binder, RetryStrategy.class).addBinding().to(AdditionalRetryStrategy.class).in(Scopes.SINGLETON); }); } diff --git a/plugin/trino-base-jdbc/src/test/java/io/trino/plugin/jdbc/TestRetryingJdbcClient.java b/plugin/trino-base-jdbc/src/test/java/io/trino/plugin/jdbc/TestRetryingJdbcClient.java new file mode 100644 index 00000000000000..384bd6edbd6bc6 --- /dev/null +++ b/plugin/trino-base-jdbc/src/test/java/io/trino/plugin/jdbc/TestRetryingJdbcClient.java @@ -0,0 +1,35 @@ +/* + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.trino.plugin.jdbc; + +import dev.failsafe.RetryPolicy; +import org.junit.jupiter.api.Test; + +import static io.trino.spi.testing.InterfaceTestUtils.assertAllMethodsOverridden; +import static io.trino.spi.testing.InterfaceTestUtils.assertProperForwardingMethodsAreCalled; + +class TestRetryingJdbcClient +{ + @Test + public void testEverythingImplemented() + { + assertAllMethodsOverridden(JdbcClient.class, RetryingJdbcClient.class); + } + + @Test + public void testProperForwardingMethodsAreCalled() + { + assertProperForwardingMethodsAreCalled(JdbcClient.class, jdbcClient -> new RetryingJdbcClient(jdbcClient, RetryPolicy.ofDefaults())); + } +} diff --git a/plugin/trino-oracle/src/main/java/io/trino/plugin/oracle/OracleClientModule.java b/plugin/trino-oracle/src/main/java/io/trino/plugin/oracle/OracleClientModule.java index 219d5d25f2970a..8d858c915fbdaa 100644 --- a/plugin/trino-oracle/src/main/java/io/trino/plugin/oracle/OracleClientModule.java +++ b/plugin/trino-oracle/src/main/java/io/trino/plugin/oracle/OracleClientModule.java @@ -27,7 +27,7 @@ import io.trino.plugin.jdbc.ForBaseJdbc; import io.trino.plugin.jdbc.JdbcClient; import io.trino.plugin.jdbc.MaxDomainCompactionThreshold; -import io.trino.plugin.jdbc.RetryingConnectionFactory.RetryStrategy; +import io.trino.plugin.jdbc.RetryStrategy; import io.trino.plugin.jdbc.TimestampTimeZoneDomain; import io.trino.plugin.jdbc.credential.CredentialProvider; import io.trino.plugin.jdbc.ptf.Query; diff --git a/plugin/trino-phoenix5/src/main/java/io/trino/plugin/phoenix5/PhoenixClientModule.java b/plugin/trino-phoenix5/src/main/java/io/trino/plugin/phoenix5/PhoenixClientModule.java index 5ead0c117c2e87..90442b03baf238 100644 --- a/plugin/trino-phoenix5/src/main/java/io/trino/plugin/phoenix5/PhoenixClientModule.java +++ b/plugin/trino-phoenix5/src/main/java/io/trino/plugin/phoenix5/PhoenixClientModule.java @@ -47,7 +47,7 @@ import io.trino.plugin.jdbc.LazyConnectionFactory; import io.trino.plugin.jdbc.MaxDomainCompactionThreshold; import io.trino.plugin.jdbc.QueryBuilder; -import io.trino.plugin.jdbc.RetryingConnectionFactoryModule; +import io.trino.plugin.jdbc.RetryingModule; import io.trino.plugin.jdbc.ReusableConnectionFactoryModule; import io.trino.plugin.jdbc.StatsCollecting; import io.trino.plugin.jdbc.TimestampTimeZoneDomain; @@ -102,7 +102,7 @@ public PhoenixClientModule(String catalogName) protected void setup(Binder binder) { install(new RemoteQueryModifierModule()); - install(new RetryingConnectionFactoryModule()); + install(new RetryingModule()); binder.bind(ConnectorSplitManager.class).annotatedWith(ForJdbcDynamicFiltering.class).to(PhoenixSplitManager.class).in(Scopes.SINGLETON); binder.bind(ConnectorSplitManager.class).annotatedWith(ForClassLoaderSafe.class).to(JdbcDynamicFilteringSplitManager.class).in(Scopes.SINGLETON); binder.bind(ConnectorSplitManager.class).to(ClassLoaderSafeConnectorSplitManager.class).in(Scopes.SINGLETON);