From df3d83371f5ee944c36e82c7726219602849eeaf Mon Sep 17 00:00:00 2001 From: sergiyvamz Date: Fri, 27 Sep 2024 16:29:15 -0700 Subject: [PATCH] failover2 on connect --- .../failover2/FailoverConnectionPlugin.java | 88 ++++++++++++++++++- 1 file changed, 84 insertions(+), 4 deletions(-) diff --git a/wrapper/src/main/java/software/amazon/jdbc/plugin/failover2/FailoverConnectionPlugin.java b/wrapper/src/main/java/software/amazon/jdbc/plugin/failover2/FailoverConnectionPlugin.java index 3d74d1705..67a632e15 100644 --- a/wrapper/src/main/java/software/amazon/jdbc/plugin/failover2/FailoverConnectionPlugin.java +++ b/wrapper/src/main/java/software/amazon/jdbc/plugin/failover2/FailoverConnectionPlugin.java @@ -43,6 +43,7 @@ import software.amazon.jdbc.plugin.staledns.AuroraStaleDnsHelper; import software.amazon.jdbc.targetdriverdialect.TargetDriverDialect; import software.amazon.jdbc.util.Messages; +import software.amazon.jdbc.util.PropertyUtils; import software.amazon.jdbc.util.RdsUrlType; import software.amazon.jdbc.util.RdsUtils; import software.amazon.jdbc.util.SqlState; @@ -65,6 +66,8 @@ public class FailoverConnectionPlugin extends AbstractConnectionPlugin { private static final String TELEMETRY_WRITER_FAILOVER = "failover to writer node"; private static final String TELEMETRY_READER_FAILOVER = "failover to replica"; + private static final String INTERNAL_CONNECT_PROPERTY_NAME = "76c06979-49c4-4c86-9600-a63605b83f50"; + public static final AwsWrapperProperty FAILOVER_TIMEOUT_MS = new AwsWrapperProperty( "failoverTimeoutMs", @@ -87,10 +90,18 @@ public class FailoverConnectionPlugin extends AbstractConnectionPlugin { "random", "The strategy that should be used to select a new reader host while opening a new connection."); + public static final AwsWrapperProperty ENABLE_CONNECT_FAILOVER = + new AwsWrapperProperty( + "enableConnectFailover", "false", + "Enable/disable cluster-aware failover if the initial connection to the database fails due to a " + + "network exception. Note that this may result in a connection to a different instance in the cluster " + + "than was specified by the URL."); + private static final Set subscribedMethods = Collections.unmodifiableSet(new HashSet() { { addAll(SubscribedMethodHelper.NETWORK_BOUND_METHODS); + add("connect"); add("initHostProvider"); } }); @@ -317,7 +328,6 @@ protected void dealWithIllegalStateException( * @throws SQLException if an error occurs */ protected void failover(final HostSpec failedHost) throws SQLException { - this.pluginService.setAvailability(failedHost.asAliases(), HostAvailability.NOT_AVAILABLE); if (this.failoverMode == FailoverMode.STRICT_WRITER) { failoverWriter(); @@ -364,6 +374,9 @@ protected void failoverReader() throws SQLException { throw new FailoverFailedSQLException(Messages.get("Failover.unableToConnectToReader")); } + final Properties copyProp = PropertyUtils.copyProperties(this.properties); + copyProp.setProperty(INTERNAL_CONNECT_PROPERTY_NAME, "true"); + final List hosts = this.pluginService.getHosts(); Connection readerCandidateConn = null; HostSpec readerCandidate = null; @@ -389,7 +402,7 @@ protected void failoverReader() throws SQLException { } try { - readerCandidateConn = this.pluginService.connect(readerCandidate, this.properties); + readerCandidateConn = this.pluginService.connect(readerCandidate, copyProp); if (this.pluginService.getHostRole(readerCandidateConn) != HostRole.READER) { readerCandidateConn.close(); readerCandidateConn = null; @@ -412,7 +425,7 @@ protected void failoverReader() throws SQLException { this.failoverReaderHostSelectorStrategySetting); if (readerCandidate != null) { try { - readerCandidateConn = this.pluginService.connect(readerCandidate, this.properties); + readerCandidateConn = this.pluginService.connect(readerCandidate, copyProp); } catch (SQLException ex) { readerCandidate = null; } @@ -480,6 +493,8 @@ protected void failoverWriter() throws SQLException { } final List updatedHosts = this.pluginService.getHosts(); + final Properties copyProp = PropertyUtils.copyProperties(this.properties); + copyProp.setProperty(INTERNAL_CONNECT_PROPERTY_NAME, "true"); Connection writerCandidateConn = null; final HostSpec writerCandidate = updatedHosts.stream() @@ -489,7 +504,7 @@ protected void failoverWriter() throws SQLException { if (writerCandidate != null) { try { - writerCandidateConn = this.pluginService.connect(writerCandidate, this.properties); + writerCandidateConn = this.pluginService.connect(writerCandidate, copyProp); } catch (SQLException ex) { // do nothing } @@ -606,4 +621,69 @@ protected boolean canDirectExecute(final String methodName) { || methodName.equals(METHOD_IS_CLOSED) || methodName.equals(METHOD_ABORT)); } + + @Override + public Connection connect( + final String driverProtocol, + final HostSpec hostSpec, + final Properties props, + final boolean isInitialConnection, + final JdbcCallable connectFunc) + throws SQLException { + + if (!ENABLE_CONNECT_FAILOVER.getBoolean(props)) { + return connectFunc.call(); + } + + // This call was initiated by this failover2 plugin and doesn't require any additional processing. + if (props.containsKey(INTERNAL_CONNECT_PROPERTY_NAME)) { + return connectFunc.call(); + } + + Connection conn = null; + + final HostSpec hostSpecWithAvailability = this.pluginService.getHosts().stream() + .filter(x -> x.getHostAndPort().equals(hostSpec.getHostAndPort())) + .findFirst() + .orElse(null); + + if (hostSpecWithAvailability == null + || hostSpecWithAvailability.getAvailability() != HostAvailability.NOT_AVAILABLE) { + + try { + conn = this.staleDnsHelper.getVerifiedConnection(isInitialConnection, this.hostListProviderService, + driverProtocol, hostSpec, props, connectFunc); + } catch (final SQLException e) { + if (!this.shouldExceptionTriggerConnectionSwitch(e)) { + throw e; + } + + this.pluginService.setAvailability(hostSpec.asAliases(), HostAvailability.NOT_AVAILABLE); + + try { + this.failover(hostSpec); + } catch (FailoverSuccessSQLException failoverSuccessException) { + conn = this.pluginService.getCurrentConnection(); + } + } + } else { + try { + this.pluginService.refreshHostList(); + this.failover(hostSpec); + } catch (FailoverSuccessSQLException failoverSuccessException) { + conn = this.pluginService.getCurrentConnection(); + } + } + + if (conn == null) { + // This should be unreachable, the above logic will either get a connection successfully or throw an exception. + throw new SQLException(Messages.get("Failover.unableToConnect")); + } + + if (isInitialConnection) { + this.pluginService.refreshHostList(conn); + } + + return conn; + } }