From 935f9c76c7a540f241544aa59932635dff9e1402 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Grzegorz=20Kokosi=C5=84ski?= Date: Thu, 5 Sep 2024 15:30:09 +0200 Subject: [PATCH] Retry creating JdbcRecordCursor Creating PreparedStatement can fail due transient issues in remote database. Let's retry it in the same as in JdbcClient. --- .../java/io/trino/plugin/jdbc/JdbcRecordSet.java | 15 +++++++++++++-- .../trino/plugin/jdbc/JdbcRecordSetProvider.java | 7 ++++++- 2 files changed, 19 insertions(+), 3 deletions(-) diff --git a/plugin/trino-base-jdbc/src/main/java/io/trino/plugin/jdbc/JdbcRecordSet.java b/plugin/trino-base-jdbc/src/main/java/io/trino/plugin/jdbc/JdbcRecordSet.java index 9d3d3269055a96..8b5b19fded0358 100644 --- a/plugin/trino-base-jdbc/src/main/java/io/trino/plugin/jdbc/JdbcRecordSet.java +++ b/plugin/trino-base-jdbc/src/main/java/io/trino/plugin/jdbc/JdbcRecordSet.java @@ -14,6 +14,7 @@ package io.trino.plugin.jdbc; import com.google.common.collect.ImmutableList; +import dev.failsafe.RetryPolicy; import io.trino.spi.connector.ConnectorSession; import io.trino.spi.connector.RecordCursor; import io.trino.spi.connector.RecordSet; @@ -22,6 +23,7 @@ import java.util.List; import java.util.concurrent.ExecutorService; +import static io.trino.plugin.jdbc.RetryingModule.retry; import static java.util.Objects.requireNonNull; public class JdbcRecordSet @@ -34,8 +36,16 @@ public class JdbcRecordSet private final List columnTypes; private final JdbcSplit split; private final ConnectorSession session; + private final RetryPolicy policy; - public JdbcRecordSet(JdbcClient jdbcClient, ExecutorService executor, ConnectorSession session, JdbcSplit split, BaseJdbcConnectorTableHandle table, List columnHandles) + public JdbcRecordSet( + JdbcClient jdbcClient, + ExecutorService executor, + ConnectorSession session, + RetryPolicy policy, + JdbcSplit split, + BaseJdbcConnectorTableHandle table, + List columnHandles) { this.jdbcClient = requireNonNull(jdbcClient, "jdbcClient is null"); this.executor = requireNonNull(executor, "executor is null"); @@ -49,6 +59,7 @@ public JdbcRecordSet(JdbcClient jdbcClient, ExecutorService executor, ConnectorS } this.columnTypes = types.build(); this.session = requireNonNull(session, "session is null"); + this.policy = requireNonNull(policy, "policy is null"); } @Override @@ -60,6 +71,6 @@ public List getColumnTypes() @Override public RecordCursor cursor() { - return new JdbcRecordCursor(jdbcClient, executor, session, split, table, columnHandles); + return retry(policy, () -> new JdbcRecordCursor(jdbcClient, executor, session, split, table, columnHandles)); } } diff --git a/plugin/trino-base-jdbc/src/main/java/io/trino/plugin/jdbc/JdbcRecordSetProvider.java b/plugin/trino-base-jdbc/src/main/java/io/trino/plugin/jdbc/JdbcRecordSetProvider.java index f32b0404922482..0e09450e885f69 100644 --- a/plugin/trino-base-jdbc/src/main/java/io/trino/plugin/jdbc/JdbcRecordSetProvider.java +++ b/plugin/trino-base-jdbc/src/main/java/io/trino/plugin/jdbc/JdbcRecordSetProvider.java @@ -16,6 +16,7 @@ import com.google.common.collect.ImmutableList; import com.google.common.collect.ImmutableSet; import com.google.inject.Inject; +import dev.failsafe.RetryPolicy; import io.trino.plugin.base.MappedRecordSet; import io.trino.spi.connector.ColumnHandle; import io.trino.spi.connector.ConnectorRecordSetProvider; @@ -41,12 +42,14 @@ public class JdbcRecordSetProvider { private final JdbcClient jdbcClient; private final ExecutorService executor; + private final RetryPolicy policy; @Inject - public JdbcRecordSetProvider(JdbcClient jdbcClient, @ForRecordCursor ExecutorService executor) + public JdbcRecordSetProvider(JdbcClient jdbcClient, @ForRecordCursor ExecutorService executor, RetryPolicy policy) { this.jdbcClient = requireNonNull(jdbcClient, "jdbcClient is null"); this.executor = requireNonNull(executor, "executor is null"); + this.policy = requireNonNull(policy, "policy is null"); } @Override @@ -72,6 +75,7 @@ public RecordSet getRecordSet(ConnectorTransactionHandle transaction, ConnectorS jdbcClient, executor, session, + policy, jdbcSplit, jdbcTableHandle.intersectedWithConstraint(jdbcSplit.getDynamicFilter().transformKeys(ColumnHandle.class::cast)), handles.build()); @@ -88,6 +92,7 @@ public RecordSet getRecordSet(ConnectorTransactionHandle transaction, ConnectorS jdbcClient, executor, session, + policy, jdbcSplit, procedureHandle, sourceColumns),