From 39827a8ecd5152e6c6ba8a349efb67c3745b682c Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Grzegorz=20Kokosi=C5=84ski?= Date: Thu, 5 Sep 2024 15:20:03 +0200 Subject: [PATCH] Introduce RetryingJdbcClient So far JDBC connectors were able to retry in case of transient issues when establishing connections. RetryingJdbcClient is able to retry an operation on remote database during other situations. --- .../trino/plugin/jdbc/CachingJdbcClient.java | 2 +- .../java/io/trino/plugin/jdbc/JdbcModule.java | 2 +- .../io/trino/plugin/jdbc/RetryStrategy.java | 19 + .../jdbc/RetryingConnectionFactory.java | 42 +- .../jdbc/RetryingConnectionFactoryModule.java | 32 -- .../trino/plugin/jdbc/RetryingJdbcClient.java | 527 ++++++++++++++++++ .../io/trino/plugin/jdbc/RetryingModule.java | 88 +++ .../jdbc/TestLazyConnectionFactory.java | 4 +- .../jdbc/TestRetryingConnectionFactory.java | 5 +- .../plugin/jdbc/TestRetryingJdbcClient.java | 35 ++ .../plugin/oracle/OracleClientModule.java | 2 +- .../plugin/phoenix5/PhoenixClientModule.java | 4 +- 12 files changed, 681 insertions(+), 81 deletions(-) create mode 100644 plugin/trino-base-jdbc/src/main/java/io/trino/plugin/jdbc/RetryStrategy.java delete mode 100644 plugin/trino-base-jdbc/src/main/java/io/trino/plugin/jdbc/RetryingConnectionFactoryModule.java create mode 100644 plugin/trino-base-jdbc/src/main/java/io/trino/plugin/jdbc/RetryingJdbcClient.java create mode 100644 plugin/trino-base-jdbc/src/main/java/io/trino/plugin/jdbc/RetryingModule.java create mode 100644 plugin/trino-base-jdbc/src/test/java/io/trino/plugin/jdbc/TestRetryingJdbcClient.java diff --git a/plugin/trino-base-jdbc/src/main/java/io/trino/plugin/jdbc/CachingJdbcClient.java b/plugin/trino-base-jdbc/src/main/java/io/trino/plugin/jdbc/CachingJdbcClient.java index 44f1a81ecba8d9..a0e8f0f473becf 100644 --- a/plugin/trino-base-jdbc/src/main/java/io/trino/plugin/jdbc/CachingJdbcClient.java +++ b/plugin/trino-base-jdbc/src/main/java/io/trino/plugin/jdbc/CachingJdbcClient.java @@ -93,7 +93,7 @@ public class CachingJdbcClient @Inject public CachingJdbcClient( - @StatsCollecting JdbcClient delegate, + RetryingJdbcClient delegate, Set sessionPropertiesProviders, IdentityCacheMapping identityMapping, BaseJdbcConfig config) diff --git a/plugin/trino-base-jdbc/src/main/java/io/trino/plugin/jdbc/JdbcModule.java b/plugin/trino-base-jdbc/src/main/java/io/trino/plugin/jdbc/JdbcModule.java index bf354ee3517817..84ac535615a56d 100644 --- a/plugin/trino-base-jdbc/src/main/java/io/trino/plugin/jdbc/JdbcModule.java +++ b/plugin/trino-base-jdbc/src/main/java/io/trino/plugin/jdbc/JdbcModule.java @@ -51,7 +51,7 @@ public void setup(Binder binder) install(new JdbcDiagnosticModule()); install(new IdentifierMappingModule()); install(new RemoteQueryModifierModule()); - install(new RetryingConnectionFactoryModule()); + install(new RetryingModule()); newOptionalBinder(binder, ConnectorAccessControl.class); newOptionalBinder(binder, QueryBuilder.class).setDefault().to(DefaultQueryBuilder.class).in(Scopes.SINGLETON); diff --git a/plugin/trino-base-jdbc/src/main/java/io/trino/plugin/jdbc/RetryStrategy.java b/plugin/trino-base-jdbc/src/main/java/io/trino/plugin/jdbc/RetryStrategy.java new file mode 100644 index 00000000000000..183912a9706f2c --- /dev/null +++ b/plugin/trino-base-jdbc/src/main/java/io/trino/plugin/jdbc/RetryStrategy.java @@ -0,0 +1,19 @@ +/* + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.trino.plugin.jdbc; + +public interface RetryStrategy +{ + boolean isExceptionRecoverable(Throwable exception); +} diff --git a/plugin/trino-base-jdbc/src/main/java/io/trino/plugin/jdbc/RetryingConnectionFactory.java b/plugin/trino-base-jdbc/src/main/java/io/trino/plugin/jdbc/RetryingConnectionFactory.java index f2003a7002a8e6..2a8220c856774a 100644 --- a/plugin/trino-base-jdbc/src/main/java/io/trino/plugin/jdbc/RetryingConnectionFactory.java +++ b/plugin/trino-base-jdbc/src/main/java/io/trino/plugin/jdbc/RetryingConnectionFactory.java @@ -13,49 +13,29 @@ */ package io.trino.plugin.jdbc; -import com.google.common.base.Throwables; import com.google.inject.Inject; import dev.failsafe.Failsafe; import dev.failsafe.FailsafeException; import dev.failsafe.RetryPolicy; import io.trino.plugin.jdbc.jmx.StatisticsAwareConnectionFactory; -import io.trino.spi.TrinoException; import io.trino.spi.connector.ConnectorSession; import java.sql.Connection; import java.sql.SQLException; -import java.sql.SQLTransientException; -import java.util.Set; -import static java.time.temporal.ChronoUnit.MILLIS; -import static java.time.temporal.ChronoUnit.SECONDS; import static java.util.Objects.requireNonNull; public class RetryingConnectionFactory implements ConnectionFactory { - private final RetryPolicy retryPolicy; - private final ConnectionFactory delegate; + private final RetryPolicy retryPolicy; @Inject - public RetryingConnectionFactory(StatisticsAwareConnectionFactory delegate, Set retryStrategies) + public RetryingConnectionFactory(StatisticsAwareConnectionFactory delegate, RetryPolicy retryPolicy) { - requireNonNull(retryStrategies); this.delegate = requireNonNull(delegate, "delegate is null"); - this.retryPolicy = RetryPolicy.builder() - .withMaxDuration(java.time.Duration.of(30, SECONDS)) - .withMaxAttempts(5) - .withBackoff(50, 5_000, MILLIS, 4) - .handleIf(throwable -> isExceptionRecoverable(retryStrategies, throwable)) - .abortOn(TrinoException.class) - .build(); - } - - private static boolean isExceptionRecoverable(Set retryStrategies, Throwable throwable) - { - return retryStrategies.stream() - .anyMatch(retryStrategy -> retryStrategy.isExceptionRecoverable(throwable)); + this.retryPolicy = requireNonNull(retryPolicy, "retryPolicy is null"); } @Override @@ -80,20 +60,4 @@ public void close() { delegate.close(); } - - public interface RetryStrategy - { - boolean isExceptionRecoverable(Throwable exception); - } - - public static class DefaultRetryStrategy - implements RetryStrategy - { - @Override - public boolean isExceptionRecoverable(Throwable exception) - { - return Throwables.getCausalChain(exception).stream() - .anyMatch(SQLTransientException.class::isInstance); - } - } } diff --git a/plugin/trino-base-jdbc/src/main/java/io/trino/plugin/jdbc/RetryingConnectionFactoryModule.java b/plugin/trino-base-jdbc/src/main/java/io/trino/plugin/jdbc/RetryingConnectionFactoryModule.java deleted file mode 100644 index bc047b5386ea80..00000000000000 --- a/plugin/trino-base-jdbc/src/main/java/io/trino/plugin/jdbc/RetryingConnectionFactoryModule.java +++ /dev/null @@ -1,32 +0,0 @@ -/* - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package io.trino.plugin.jdbc; - -import com.google.inject.AbstractModule; -import com.google.inject.Scopes; -import io.trino.plugin.jdbc.RetryingConnectionFactory.DefaultRetryStrategy; -import io.trino.plugin.jdbc.RetryingConnectionFactory.RetryStrategy; - -import static com.google.inject.multibindings.Multibinder.newSetBinder; - -public class RetryingConnectionFactoryModule - extends AbstractModule -{ - @Override - public void configure() - { - bind(RetryingConnectionFactory.class).in(Scopes.SINGLETON); - newSetBinder(binder(), RetryStrategy.class).addBinding().to(DefaultRetryStrategy.class).in(Scopes.SINGLETON); - } -} diff --git a/plugin/trino-base-jdbc/src/main/java/io/trino/plugin/jdbc/RetryingJdbcClient.java b/plugin/trino-base-jdbc/src/main/java/io/trino/plugin/jdbc/RetryingJdbcClient.java new file mode 100644 index 00000000000000..240ac46bae9239 --- /dev/null +++ b/plugin/trino-base-jdbc/src/main/java/io/trino/plugin/jdbc/RetryingJdbcClient.java @@ -0,0 +1,527 @@ +/* + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.trino.plugin.jdbc; + +import com.google.inject.Inject; +import dev.failsafe.RetryPolicy; +import io.trino.plugin.jdbc.JdbcProcedureHandle.ProcedureQuery; +import io.trino.plugin.jdbc.expression.ParameterizedExpression; +import io.trino.spi.connector.AggregateFunction; +import io.trino.spi.connector.ColumnHandle; +import io.trino.spi.connector.ColumnMetadata; +import io.trino.spi.connector.ConnectorSession; +import io.trino.spi.connector.ConnectorSplitSource; +import io.trino.spi.connector.ConnectorTableMetadata; +import io.trino.spi.connector.JoinStatistics; +import io.trino.spi.connector.JoinType; +import io.trino.spi.connector.RelationColumnsMetadata; +import io.trino.spi.connector.RelationCommentMetadata; +import io.trino.spi.connector.SchemaTableName; +import io.trino.spi.connector.SystemTable; +import io.trino.spi.connector.TableScanRedirectApplicationResult; +import io.trino.spi.expression.ConnectorExpression; +import io.trino.spi.statistics.TableStatistics; +import io.trino.spi.type.Type; + +import java.sql.CallableStatement; +import java.sql.Connection; +import java.sql.PreparedStatement; +import java.sql.ResultSet; +import java.sql.SQLException; +import java.util.Iterator; +import java.util.List; +import java.util.Map; +import java.util.Optional; +import java.util.OptionalInt; +import java.util.OptionalLong; +import java.util.Set; + +import static io.trino.plugin.jdbc.RetryingModule.retry; +import static java.util.Objects.requireNonNull; + +public class RetryingJdbcClient + implements JdbcClient +{ + private final JdbcClient delegate; + private final RetryPolicy policy; + + @Inject + public RetryingJdbcClient(@StatsCollecting JdbcClient delegate, RetryPolicy policy) + { + this.delegate = requireNonNull(delegate, "delegate is null"); + this.policy = requireNonNull(policy, "policy is null"); + } + + @Override + public boolean schemaExists(ConnectorSession session, String schema) + { + return retry(policy, () -> delegate.schemaExists(session, schema)); + } + + @Override + public Set getSchemaNames(ConnectorSession session) + { + return retry(policy, () -> delegate.getSchemaNames(session)); + } + + @Override + public List getTableNames(ConnectorSession session, Optional schema) + { + return retry(policy, () -> delegate.getTableNames(session, schema)); + } + + @Override + public Optional getTableHandle(ConnectorSession session, SchemaTableName schemaTableName) + { + return retry(policy, () -> delegate.getTableHandle(session, schemaTableName)); + } + + @Override + public JdbcTableHandle getTableHandle(ConnectorSession session, PreparedQuery preparedQuery) + { + return retry(policy, () -> delegate.getTableHandle(session, preparedQuery)); + } + + @Override + public JdbcProcedureHandle getProcedureHandle(ConnectorSession session, ProcedureQuery procedureQuery) + { + return retry(policy, () -> delegate.getProcedureHandle(session, procedureQuery)); + } + + @Override + public List getColumns(ConnectorSession session, JdbcTableHandle tableHandle) + { + return retry(policy, () -> delegate.getColumns(session, tableHandle)); + } + + @Override + public Iterator getAllTableColumns(ConnectorSession session, Optional schema) + { + return retry(policy, () -> delegate.getAllTableColumns(session, schema)); + } + + @Override + public List getAllTableComments(ConnectorSession session, Optional schema) + { + return retry(policy, () -> delegate.getAllTableComments(session, schema)); + } + + @Override + public Optional toColumnMapping(ConnectorSession session, Connection connection, JdbcTypeHandle typeHandle) + { + // no retrying as it could be not idempotent operation (connection could be not reusable after the first failure) + return delegate.toColumnMapping(session, connection, typeHandle); + } + + @Override + public List toColumnMappings(ConnectorSession session, List typeHandles) + { + // there should be no remote database interaction + return delegate.toColumnMappings(session, typeHandles); + } + + @Override + public WriteMapping toWriteMapping(ConnectorSession session, Type type) + { + // there should be no remote database interaction + return delegate.toWriteMapping(session, type); + } + + @Override + public Optional getSupportedType(ConnectorSession session, Type type) + { + // there should be no remote database interaction + return delegate.getSupportedType(session, type); + } + + @Override + public boolean supportsAggregationPushdown(ConnectorSession session, JdbcTableHandle table, List aggregates, Map assignments, List> groupingSets) + { + // there should be no remote database interaction + return delegate.supportsAggregationPushdown(session, table, aggregates, assignments, groupingSets); + } + + @Override + public Optional implementAggregation(ConnectorSession session, AggregateFunction aggregate, Map assignments) + { + // there should be no remote database interaction + return delegate.implementAggregation(session, aggregate, assignments); + } + + @Override + public Optional convertPredicate(ConnectorSession session, ConnectorExpression expression, Map assignments) + { + // there should be no remote database interaction + return delegate.convertPredicate(session, expression, assignments); + } + + @Override + public Optional convertProjection(ConnectorSession session, JdbcTableHandle handle, ConnectorExpression expression, Map assignments) + { + // there should be no remote database interaction + return delegate.convertProjection(session, handle, expression, assignments); + } + + @Override + public ConnectorSplitSource getSplits(ConnectorSession session, JdbcTableHandle tableHandle) + { + return retry(policy, () -> delegate.getSplits(session, tableHandle)); + } + + @Override + public ConnectorSplitSource getSplits(ConnectorSession session, JdbcProcedureHandle procedureHandle) + { + return retry(policy, () -> delegate.getSplits(session, procedureHandle)); + } + + @Override + public Connection getConnection(ConnectorSession session, JdbcSplit split, JdbcTableHandle tableHandle) + throws SQLException + { + // retry already implemented by RetryingConnectionFactory + return delegate.getConnection(session, split, tableHandle); + } + + @Override + public Connection getConnection(ConnectorSession session, JdbcSplit split, JdbcProcedureHandle procedureHandle) + throws SQLException + { + // retry already implemented by RetryingConnectionFactory + return delegate.getConnection(session, split, procedureHandle); + } + + @Override + public void abortReadConnection(Connection connection, ResultSet resultSet) + throws SQLException + { + // no retrying as it could be not idempotent operation (connection could be not reusable after the first failure) + delegate.abortReadConnection(connection, resultSet); + } + + @Override + public PreparedQuery prepareQuery(ConnectorSession session, JdbcTableHandle table, Optional>> groupingSets, List columns, Map columnExpressions) + { + // there should be no remote database interaction + return delegate.prepareQuery(session, table, groupingSets, columns, columnExpressions); + } + + @Override + public PreparedStatement buildSql(ConnectorSession session, Connection connection, JdbcSplit split, JdbcTableHandle table, List columns) + throws SQLException + { + // no retrying as it could be not idempotent operation (connection could be not reusable after the first failure) + return delegate.buildSql(session, connection, split, table, columns); + } + + @Override + public CallableStatement buildProcedure(ConnectorSession session, Connection connection, JdbcSplit split, JdbcProcedureHandle procedureHandle) + throws SQLException + { + // no retrying as it could be not idempotent operation (connection could be not reusable after the first failure) + return delegate.buildProcedure(session, connection, split, procedureHandle); + } + + @Override + public Optional implementJoin(ConnectorSession session, JoinType joinType, PreparedQuery leftSource, Map leftProjections, PreparedQuery rightSource, Map rightProjections, List joinConditions, JoinStatistics statistics) + { + // there should be no remote database interaction + return delegate.implementJoin(session, joinType, leftSource, leftProjections, rightSource, rightProjections, joinConditions, statistics); + } + + @Override + public Optional legacyImplementJoin(ConnectorSession session, JoinType joinType, PreparedQuery leftSource, PreparedQuery rightSource, List joinConditions, Map rightAssignments, Map leftAssignments, JoinStatistics statistics) + { + // there should be no remote database interaction + return delegate.legacyImplementJoin(session, joinType, leftSource, rightSource, joinConditions, rightAssignments, leftAssignments, statistics); + } + + @Override + public boolean supportsTopN(ConnectorSession session, JdbcTableHandle handle, List sortOrder) + { + // there should be no remote database interaction + return delegate.supportsTopN(session, handle, sortOrder); + } + + @Override + public boolean isTopNGuaranteed(ConnectorSession session) + { + // there should be no remote database interaction + return delegate.isTopNGuaranteed(session); + } + + @Override + public boolean supportsLimit() + { + // there should be no remote database interaction + return delegate.supportsLimit(); + } + + @Override + public boolean isLimitGuaranteed(ConnectorSession session) + { + // there should be no remote database interaction + return delegate.isLimitGuaranteed(session); + } + + @Override + public Optional getTableComment(ResultSet resultSet) + throws SQLException + { + // no retrying as it could be not idempotent operation + return delegate.getTableComment(resultSet); + } + + @Override + public void setTableComment(ConnectorSession session, JdbcTableHandle handle, Optional comment) + { + // no retrying as it could be not idempotent operation + delegate.setTableComment(session, handle, comment); + } + + @Override + public void setColumnComment(ConnectorSession session, JdbcTableHandle handle, JdbcColumnHandle column, Optional comment) + { + // no retrying as it could be not idempotent operation + delegate.setColumnComment(session, handle, column, comment); + } + + @Override + public void addColumn(ConnectorSession session, JdbcTableHandle handle, ColumnMetadata column) + { + // no retrying as it could be not idempotent operation + delegate.addColumn(session, handle, column); + } + + @Override + public void dropColumn(ConnectorSession session, JdbcTableHandle handle, JdbcColumnHandle column) + { + // no retrying as it could be not idempotent operation + delegate.dropColumn(session, handle, column); + } + + @Override + public void renameColumn(ConnectorSession session, JdbcTableHandle handle, JdbcColumnHandle jdbcColumn, String newColumnName) + { + // no retrying as it could be not idempotent operation + delegate.renameColumn(session, handle, jdbcColumn, newColumnName); + } + + @Override + public void setColumnType(ConnectorSession session, JdbcTableHandle handle, JdbcColumnHandle column, Type type) + { + // no retrying as it could be not idempotent operation + delegate.setColumnType(session, handle, column, type); + } + + @Override + public void dropNotNullConstraint(ConnectorSession session, JdbcTableHandle handle, JdbcColumnHandle column) + { + // no retrying as it could be not idempotent operation + delegate.dropNotNullConstraint(session, handle, column); + } + + @Override + public void renameTable(ConnectorSession session, JdbcTableHandle handle, SchemaTableName newTableName) + { + // no retrying as it could be not idempotent operation + delegate.renameTable(session, handle, newTableName); + } + + @Override + public void setTableProperties(ConnectorSession session, JdbcTableHandle handle, Map> properties) + { + // no retrying as it could be not idempotent operation + delegate.setTableProperties(session, handle, properties); + } + + @Override + public void createTable(ConnectorSession session, ConnectorTableMetadata tableMetadata) + { + // no retrying as it could be not idempotent operation + delegate.createTable(session, tableMetadata); + } + + @Override + public JdbcOutputTableHandle beginCreateTable(ConnectorSession session, ConnectorTableMetadata tableMetadata) + { + // no retrying as it could be not idempotent operation + return delegate.beginCreateTable(session, tableMetadata); + } + + @Override + public void commitCreateTable(ConnectorSession session, JdbcOutputTableHandle handle, Set pageSinkIds) + { + // no retrying as it could be not idempotent operation + delegate.commitCreateTable(session, handle, pageSinkIds); + } + + @Override + public JdbcOutputTableHandle beginInsertTable(ConnectorSession session, JdbcTableHandle tableHandle, List columns) + { + // no retrying as it could be not idempotent operation + return delegate.beginInsertTable(session, tableHandle, columns); + } + + @Override + public void finishInsertTable(ConnectorSession session, JdbcOutputTableHandle handle, Set pageSinkIds) + { + // no retrying as it could be not idempotent operation + delegate.finishInsertTable(session, handle, pageSinkIds); + } + + @Override + public void dropTable(ConnectorSession session, JdbcTableHandle jdbcTableHandle) + { + // no retrying as it could be not idempotent operation + delegate.dropTable(session, jdbcTableHandle); + } + + @Override + public void rollbackCreateTable(ConnectorSession session, JdbcOutputTableHandle handle) + { + // no retrying as it could be not idempotent operation + delegate.rollbackCreateTable(session, handle); + } + + @Override + public boolean supportsRetries() + { + // there should be no remote database interaction + return delegate.supportsRetries(); + } + + @Override + public String buildInsertSql(JdbcOutputTableHandle handle, List columnWriters) + { + // there should be no remote database interaction + return delegate.buildInsertSql(handle, columnWriters); + } + + @Override + public Connection getConnection(ConnectorSession session) + throws SQLException + { + // retry already implemented by RetryingConnectionFactory + return delegate.getConnection(session); + } + + @Override + public Connection getConnection(ConnectorSession session, JdbcOutputTableHandle handle) + throws SQLException + { + // retry already implemented by RetryingConnectionFactory + return delegate.getConnection(session); + } + + @Override + public PreparedStatement getPreparedStatement(Connection connection, String sql, Optional columnCount) + throws SQLException + { + // no retrying as it could be not idempotent operation (connection could be not reusable after the first failure) + return delegate.getPreparedStatement(connection, sql, columnCount); + } + + @Override + public TableStatistics getTableStatistics(ConnectorSession session, JdbcTableHandle handle) + { + return retry(policy, () -> delegate.getTableStatistics(session, handle)); + } + + @Override + public void createSchema(ConnectorSession session, String schemaName) + { + // no retrying as it could be not idempotent operation + delegate.createSchema(session, schemaName); + } + + @Override + public void dropSchema(ConnectorSession session, String schemaName, boolean cascade) + { + // no retrying as it could be not idempotent operation + delegate.dropSchema(session, schemaName, cascade); + } + + @Override + public void renameSchema(ConnectorSession session, String schemaName, String newSchemaName) + { + // no retrying as it could be not idempotent operation + delegate.renameSchema(session, schemaName, newSchemaName); + } + + @Override + public Optional getSystemTable(ConnectorSession session, SchemaTableName tableName) + { + // there should be no remote database interaction + return delegate.getSystemTable(session, tableName); + } + + @Override + public String quoted(String name) + { + // there should be no remote database interaction + return delegate.quoted(name); + } + + @Override + public String quoted(RemoteTableName remoteTableName) + { + // there should be no remote database interaction + return delegate.quoted(remoteTableName); + } + + @Override + public Map getTableProperties(ConnectorSession session, JdbcTableHandle tableHandle) + { + return retry(policy, () -> delegate.getTableProperties(session, tableHandle)); + } + + @Override + public Optional getTableScanRedirection(ConnectorSession session, JdbcTableHandle tableHandle) + { + return retry(policy, () -> delegate.getTableScanRedirection(session, tableHandle)); + } + + @Override + public OptionalLong delete(ConnectorSession session, JdbcTableHandle handle) + { + // no retrying as it could be not idempotent operation + return delegate.delete(session, handle); + } + + @Override + public void truncateTable(ConnectorSession session, JdbcTableHandle handle) + { + // no retrying as it could be not idempotent operation + delegate.truncateTable(session, handle); + } + + @Override + public OptionalLong update(ConnectorSession session, JdbcTableHandle handle) + { + // no retrying as it could be not idempotent operation + return delegate.update(session, handle); + } + + @Override + public OptionalInt getMaxWriteParallelism(ConnectorSession session) + { + return retry(policy, () -> delegate.getMaxWriteParallelism(session)); + } + + @Override + public OptionalInt getMaxColumnNameLength(ConnectorSession session) + { + return retry(policy, () -> delegate.getMaxColumnNameLength(session)); + } +} diff --git a/plugin/trino-base-jdbc/src/main/java/io/trino/plugin/jdbc/RetryingModule.java b/plugin/trino-base-jdbc/src/main/java/io/trino/plugin/jdbc/RetryingModule.java new file mode 100644 index 00000000000000..4b66b93ef77844 --- /dev/null +++ b/plugin/trino-base-jdbc/src/main/java/io/trino/plugin/jdbc/RetryingModule.java @@ -0,0 +1,88 @@ +/* + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.trino.plugin.jdbc; + +import com.google.common.base.Throwables; +import com.google.inject.AbstractModule; +import com.google.inject.Provides; +import com.google.inject.Scopes; +import com.google.inject.Singleton; +import dev.failsafe.Failsafe; +import dev.failsafe.FailsafeException; +import dev.failsafe.RetryPolicy; +import dev.failsafe.function.CheckedSupplier; +import io.trino.spi.TrinoException; + +import java.sql.SQLTransientException; +import java.util.Set; + +import static com.google.inject.multibindings.Multibinder.newSetBinder; +import static java.time.temporal.ChronoUnit.MILLIS; +import static java.time.temporal.ChronoUnit.SECONDS; + +public class RetryingModule + extends AbstractModule +{ + @Override + public void configure() + { + bind(RetryingConnectionFactory.class).in(Scopes.SINGLETON); + bind(RetryingJdbcClient.class).in(Scopes.SINGLETON); + newSetBinder(binder(), RetryStrategy.class).addBinding().to(RetryOnSqlTransientException.class).in(Scopes.SINGLETON); + } + + @Provides + @Singleton + public RetryPolicy createRetryPolicy(Set retryStrategies) + { + return RetryPolicy.builder() + .withMaxDuration(java.time.Duration.of(30, SECONDS)) + .withMaxAttempts(5) + .withBackoff(50, 5_000, MILLIS, 4) + .handleIf(throwable -> isExceptionRecoverable(retryStrategies, throwable)) + .abortOn(TrinoException.class) + .build(); + } + + public static T retry(RetryPolicy policy, CheckedSupplier supplier) + { + try { + return Failsafe.with(policy) + .get(supplier); + } + catch (FailsafeException ex) { + if (ex.getCause() instanceof TrinoException) { + throw (TrinoException) ex.getCause(); + } + throw ex; + } + } + + private static boolean isExceptionRecoverable(Set retryStrategies, Throwable throwable) + { + return retryStrategies.stream() + .anyMatch(retryStrategy -> retryStrategy.isExceptionRecoverable(throwable)); + } + + private static class RetryOnSqlTransientException + implements RetryStrategy + { + @Override + public boolean isExceptionRecoverable(Throwable exception) + { + return Throwables.getCausalChain(exception).stream() + .anyMatch(SQLTransientException.class::isInstance); + } + } +} diff --git a/plugin/trino-base-jdbc/src/test/java/io/trino/plugin/jdbc/TestLazyConnectionFactory.java b/plugin/trino-base-jdbc/src/test/java/io/trino/plugin/jdbc/TestLazyConnectionFactory.java index 797faadd65b357..508d45b559c8a0 100644 --- a/plugin/trino-base-jdbc/src/test/java/io/trino/plugin/jdbc/TestLazyConnectionFactory.java +++ b/plugin/trino-base-jdbc/src/test/java/io/trino/plugin/jdbc/TestLazyConnectionFactory.java @@ -37,7 +37,7 @@ public void testNoConnectionIsCreated() session -> { throw new AssertionError("Expected no connection creation"); }); - binder.install(new RetryingConnectionFactoryModule()); + binder.install(new RetryingModule()); }); try (LazyConnectionFactory lazyConnectionFactory = injector.getInstance(LazyConnectionFactory.class); @@ -56,7 +56,7 @@ public void testConnectionCannotBeReusedAfterClose() Injector injector = Guice.createInjector(binder -> { binder.bind(ConnectionFactory.class).annotatedWith(ForBaseJdbc.class).toInstance( DriverConnectionFactory.builder(new Driver(), config.getConnectionUrl(), new EmptyCredentialProvider()).build()); - binder.install(new RetryingConnectionFactoryModule()); + binder.install(new RetryingModule()); }); try (LazyConnectionFactory lazyConnectionFactory = injector.getInstance(LazyConnectionFactory.class)) { diff --git a/plugin/trino-base-jdbc/src/test/java/io/trino/plugin/jdbc/TestRetryingConnectionFactory.java b/plugin/trino-base-jdbc/src/test/java/io/trino/plugin/jdbc/TestRetryingConnectionFactory.java index 95e83c5d0a0813..59063ca6c5f9d9 100644 --- a/plugin/trino-base-jdbc/src/test/java/io/trino/plugin/jdbc/TestRetryingConnectionFactory.java +++ b/plugin/trino-base-jdbc/src/test/java/io/trino/plugin/jdbc/TestRetryingConnectionFactory.java @@ -19,7 +19,6 @@ import com.google.inject.Injector; import com.google.inject.Key; import com.google.inject.Scopes; -import io.trino.plugin.jdbc.RetryingConnectionFactory.RetryStrategy; import io.trino.spi.StandardErrorCode; import io.trino.spi.TrinoException; import io.trino.spi.connector.ConnectorSession; @@ -160,7 +159,7 @@ private static Injector createInjector(MockConnectorFactory.Action... actions) binder.bind(MockConnectorFactory.Action[].class).toInstance(actions); binder.bind(MockConnectorFactory.class).in(Scopes.SINGLETON); binder.bind(ConnectionFactory.class).annotatedWith(ForBaseJdbc.class).to(Key.get(MockConnectorFactory.class)); - binder.install(new RetryingConnectionFactoryModule()); + binder.install(new RetryingModule()); }); } @@ -170,7 +169,7 @@ private static Injector createInjectorWithAdditionalStrategy(MockConnectorFactor binder.bind(MockConnectorFactory.Action[].class).toInstance(actions); binder.bind(MockConnectorFactory.class).in(Scopes.SINGLETON); binder.bind(ConnectionFactory.class).annotatedWith(ForBaseJdbc.class).to(Key.get(MockConnectorFactory.class)); - binder.install(new RetryingConnectionFactoryModule()); + binder.install(new RetryingModule()); newSetBinder(binder, RetryStrategy.class).addBinding().to(AdditionalRetryStrategy.class).in(Scopes.SINGLETON); }); } diff --git a/plugin/trino-base-jdbc/src/test/java/io/trino/plugin/jdbc/TestRetryingJdbcClient.java b/plugin/trino-base-jdbc/src/test/java/io/trino/plugin/jdbc/TestRetryingJdbcClient.java new file mode 100644 index 00000000000000..384bd6edbd6bc6 --- /dev/null +++ b/plugin/trino-base-jdbc/src/test/java/io/trino/plugin/jdbc/TestRetryingJdbcClient.java @@ -0,0 +1,35 @@ +/* + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.trino.plugin.jdbc; + +import dev.failsafe.RetryPolicy; +import org.junit.jupiter.api.Test; + +import static io.trino.spi.testing.InterfaceTestUtils.assertAllMethodsOverridden; +import static io.trino.spi.testing.InterfaceTestUtils.assertProperForwardingMethodsAreCalled; + +class TestRetryingJdbcClient +{ + @Test + public void testEverythingImplemented() + { + assertAllMethodsOverridden(JdbcClient.class, RetryingJdbcClient.class); + } + + @Test + public void testProperForwardingMethodsAreCalled() + { + assertProperForwardingMethodsAreCalled(JdbcClient.class, jdbcClient -> new RetryingJdbcClient(jdbcClient, RetryPolicy.ofDefaults())); + } +} diff --git a/plugin/trino-oracle/src/main/java/io/trino/plugin/oracle/OracleClientModule.java b/plugin/trino-oracle/src/main/java/io/trino/plugin/oracle/OracleClientModule.java index 219d5d25f2970a..8d858c915fbdaa 100644 --- a/plugin/trino-oracle/src/main/java/io/trino/plugin/oracle/OracleClientModule.java +++ b/plugin/trino-oracle/src/main/java/io/trino/plugin/oracle/OracleClientModule.java @@ -27,7 +27,7 @@ import io.trino.plugin.jdbc.ForBaseJdbc; import io.trino.plugin.jdbc.JdbcClient; import io.trino.plugin.jdbc.MaxDomainCompactionThreshold; -import io.trino.plugin.jdbc.RetryingConnectionFactory.RetryStrategy; +import io.trino.plugin.jdbc.RetryStrategy; import io.trino.plugin.jdbc.TimestampTimeZoneDomain; import io.trino.plugin.jdbc.credential.CredentialProvider; import io.trino.plugin.jdbc.ptf.Query; diff --git a/plugin/trino-phoenix5/src/main/java/io/trino/plugin/phoenix5/PhoenixClientModule.java b/plugin/trino-phoenix5/src/main/java/io/trino/plugin/phoenix5/PhoenixClientModule.java index 5ead0c117c2e87..90442b03baf238 100644 --- a/plugin/trino-phoenix5/src/main/java/io/trino/plugin/phoenix5/PhoenixClientModule.java +++ b/plugin/trino-phoenix5/src/main/java/io/trino/plugin/phoenix5/PhoenixClientModule.java @@ -47,7 +47,7 @@ import io.trino.plugin.jdbc.LazyConnectionFactory; import io.trino.plugin.jdbc.MaxDomainCompactionThreshold; import io.trino.plugin.jdbc.QueryBuilder; -import io.trino.plugin.jdbc.RetryingConnectionFactoryModule; +import io.trino.plugin.jdbc.RetryingModule; import io.trino.plugin.jdbc.ReusableConnectionFactoryModule; import io.trino.plugin.jdbc.StatsCollecting; import io.trino.plugin.jdbc.TimestampTimeZoneDomain; @@ -102,7 +102,7 @@ public PhoenixClientModule(String catalogName) protected void setup(Binder binder) { install(new RemoteQueryModifierModule()); - install(new RetryingConnectionFactoryModule()); + install(new RetryingModule()); binder.bind(ConnectorSplitManager.class).annotatedWith(ForJdbcDynamicFiltering.class).to(PhoenixSplitManager.class).in(Scopes.SINGLETON); binder.bind(ConnectorSplitManager.class).annotatedWith(ForClassLoaderSafe.class).to(JdbcDynamicFilteringSplitManager.class).in(Scopes.SINGLETON); binder.bind(ConnectorSplitManager.class).to(ClassLoaderSafeConnectorSplitManager.class).in(Scopes.SINGLETON);