From 8057c39c721368e63d7ff9f03463d6b309021982 Mon Sep 17 00:00:00 2001 From: fengyubiao Date: Mon, 23 Sep 2024 16:26:23 +0800 Subject: [PATCH] [fix] FIx ArrayIndexOutOfBoundsException when using SameAuthParamsLookupAutoClusterFailover --- ...meAuthParamsLookupAutoClusterFailover.java | 51 +++++++++++-------- 1 file changed, 29 insertions(+), 22 deletions(-) diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/SameAuthParamsLookupAutoClusterFailover.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/SameAuthParamsLookupAutoClusterFailover.java index 4beff4719c895..83d53b8bb101d 100644 --- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/SameAuthParamsLookupAutoClusterFailover.java +++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/SameAuthParamsLookupAutoClusterFailover.java @@ -71,29 +71,36 @@ public void initialize(PulsarClient client) { this.executor = EventLoopUtil.newEventLoopGroup(1, false, new ExecutorProvider.ExtendedThreadFactory("broker-service-url-check")); scheduledCheckTask = executor.scheduleAtFixedRate(() -> { - if (closed) { - return; - } - checkPulsarServices(); - int firstHealthyPulsarService = firstHealthyPulsarService(); - if (firstHealthyPulsarService == currentPulsarServiceIndex) { - return; - } - if (firstHealthyPulsarService < 0) { - int failoverTo = findFailoverTo(); - if (failoverTo < 0) { - // No healthy pulsar service to connect. - log.error("Failed to choose a pulsar service to connect, no one pulsar service is healthy. Current" - + " pulsar service: [{}] {}. States: {}, Counters: {}", currentPulsarServiceIndex, - pulsarServiceUrlArray[currentPulsarServiceIndex], Arrays.toString(pulsarServiceStateArray), - Arrays.toString(checkCounterArray)); + try { + if (closed) { + return; + } + checkPulsarServices(); + int firstHealthyPulsarService = firstHealthyPulsarService(); + if (firstHealthyPulsarService == currentPulsarServiceIndex) { + return; + } + if (firstHealthyPulsarService < 0) { + int failoverTo = findFailoverTo(); + if (failoverTo < 0) { + // No healthy pulsar service to connect. + log.error( + "Failed to choose a pulsar service to connect, no one pulsar service is healthy." + + " Current pulsar service: [{}] {}. States: {}, Counters: {}", + currentPulsarServiceIndex, + pulsarServiceUrlArray[currentPulsarServiceIndex], + Arrays.toString(pulsarServiceStateArray), + Arrays.toString(checkCounterArray)); + } else { + // Failover to low priority pulsar service. + updateServiceUrl(failoverTo); + } } else { - // Failover to low priority pulsar service. - updateServiceUrl(failoverTo); + // Back to high priority pulsar service. + updateServiceUrl(firstHealthyPulsarService); } - } else { - // Back to high priority pulsar service. - updateServiceUrl(firstHealthyPulsarService); + } catch (Exception ex) { + log.error("Failed to re-check cluster status", ex); } }, checkHealthyIntervalMs, checkHealthyIntervalMs, TimeUnit.MILLISECONDS); } @@ -123,7 +130,7 @@ private int firstHealthyPulsarService() { } private int findFailoverTo() { - for (int i = currentPulsarServiceIndex + 1; i <= pulsarServiceUrlArray.length; i++) { + for (int i = currentPulsarServiceIndex + 1; i < pulsarServiceUrlArray.length; i++) { if (probeAvailable(i)) { return i; }