Skip to content

Commit

Permalink
Move OAuth2 token management in dedicated package
Browse files Browse the repository at this point in the history
  • Loading branch information
acogoluegnes committed Dec 20, 2024
1 parent 820921c commit eabdbbe
Show file tree
Hide file tree
Showing 15 changed files with 147 additions and 74 deletions.
10 changes: 6 additions & 4 deletions src/main/java/com/rabbitmq/client/amqp/impl/AmqpConnection.java
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@
import com.rabbitmq.client.amqp.impl.Utils.RunnableWithException;
import com.rabbitmq.client.amqp.impl.Utils.StopWatch;
import com.rabbitmq.client.amqp.metrics.MetricsCollector;
import com.rabbitmq.client.amqp.oauth2.CredentialsManager;
import java.time.Duration;
import java.util.*;
import java.util.concurrent.*;
Expand Down Expand Up @@ -80,7 +81,7 @@ final class AmqpConnection extends ResourceBase implements Connection {
private final Lock instanceLock = new ReentrantLock();
private final boolean filterExpressionsSupported, setTokenSupported;
private volatile ExecutorService dispatchingExecutorService;
private final Credentials.Registration credentialsRegistration;
private final CredentialsManager.Registration credentialsRegistration;

AmqpConnection(AmqpConnectionBuilder builder) {
super(builder.listeners());
Expand Down Expand Up @@ -119,9 +120,9 @@ final class AmqpConnection extends ResourceBase implements Connection {
this.affinityStrategy = null;
}
this.management = createManagement();
Credentials credentials = builder.credentials();
CredentialsManager credentialsManager = builder.credentialsManager();
this.credentialsRegistration =
credentials.register(
credentialsManager.register(
this.name(),
(username, password) -> {
State state = this.state();
Expand Down Expand Up @@ -835,7 +836,8 @@ public int hashCode() {
return Objects.hashCode(id);
}

private static class TokenConnectionCallback implements Credentials.AuthenticationCallback {
private static class TokenConnectionCallback
implements CredentialsManager.AuthenticationCallback {

private final ConnectionOptions options;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
package com.rabbitmq.client.amqp.impl;

import com.rabbitmq.client.amqp.*;
import com.rabbitmq.client.amqp.oauth2.CredentialsManager;
import java.time.Duration;
import java.util.ArrayList;
import java.util.List;
Expand Down Expand Up @@ -162,8 +163,8 @@ AmqpEnvironment environment() {
return environment;
}

Credentials credentials() {
return environment().credentialsFactory().credentials(this.connectionSettings);
CredentialsManager credentialsManager() {
return environment().credentialsManagerFactory().credentials(this.connectionSettings);
}

AmqpRecoveryConfiguration recoveryConfiguration() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,7 @@ class AmqpEnvironment implements Environment {
private final ConnectionUtils.AffinityCache affinityCache = new ConnectionUtils.AffinityCache();
private final EventLoop recoveryEventLoop;
private final ExecutorService recoveryEventLoopExecutorService;
private final CredentialsFactory credentialsFactory = new CredentialsFactory(this);
private final CredentialsManagerFactory credentialsManagerFactory = new CredentialsManagerFactory(this);

AmqpEnvironment(
ExecutorService executorService,
Expand Down Expand Up @@ -124,8 +124,8 @@ Clock clock() {
return this.clock;
}

CredentialsFactory credentialsFactory() {
return this.credentialsFactory;
CredentialsManagerFactory credentialsManagerFactory() {
return this.credentialsManagerFactory;
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,62 +19,66 @@

import com.rabbitmq.client.amqp.CredentialsProvider;
import com.rabbitmq.client.amqp.UsernamePasswordCredentialsProvider;
import com.rabbitmq.client.amqp.oauth2.CredentialsManager;
import com.rabbitmq.client.amqp.oauth2.GsonTokenParser;
import com.rabbitmq.client.amqp.oauth2.HttpTokenRequester;
import com.rabbitmq.client.amqp.oauth2.TokenCredentialsManager;
import java.net.http.HttpClient;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;
import java.util.function.Consumer;

final class CredentialsFactory {
final class CredentialsManagerFactory {

private volatile Credentials globalOAuth2Credentials;
private volatile CredentialsManager globalOAuth2CredentialsManager;
private final Lock oauth2CredentialsLock = new ReentrantLock();
private final AmqpEnvironment environment;

CredentialsFactory(AmqpEnvironment environment) {
CredentialsManagerFactory(AmqpEnvironment environment) {
this.environment = environment;
}

Credentials credentials(DefaultConnectionSettings<?> settings) {
CredentialsManager credentials(DefaultConnectionSettings<?> settings) {
CredentialsProvider provider = settings.credentialsProvider();
Credentials credentials;
CredentialsManager credentialsManager;
if (settings.oauth2().enabled()) {
if (settings.oauth2().shared()) {
credentials = globalOAuth2Credentials(settings);
credentialsManager = globalOAuth2Credentials(settings);
} else {
credentials = createOAuth2Credentials(settings);
credentialsManager = createOAuth2Credentials(settings);
}
} else {
if (provider instanceof UsernamePasswordCredentialsProvider) {
UsernamePasswordCredentialsProvider credentialsProvider =
(UsernamePasswordCredentialsProvider) provider;
credentials = new UsernamePasswordCredentials(credentialsProvider);
credentialsManager = new UsernamePasswordCredentialsManager(credentialsProvider);
} else {
credentials = Credentials.NO_OP;
credentialsManager = CredentialsManager.NO_OP;
}
}
return credentials;
return credentialsManager;
}

private Credentials globalOAuth2Credentials(DefaultConnectionSettings<?> connectionSettings) {
Credentials result = this.globalOAuth2Credentials;
private CredentialsManager globalOAuth2Credentials(
DefaultConnectionSettings<?> connectionSettings) {
CredentialsManager result = this.globalOAuth2CredentialsManager;
if (result != null) {
return result;
}

this.oauth2CredentialsLock.lock();
try {
if (this.globalOAuth2Credentials == null) {
this.globalOAuth2Credentials = createOAuth2Credentials(connectionSettings);
if (this.globalOAuth2CredentialsManager == null) {
this.globalOAuth2CredentialsManager = createOAuth2Credentials(connectionSettings);
}
return this.globalOAuth2Credentials;
return this.globalOAuth2CredentialsManager;
} finally {
this.oauth2CredentialsLock.unlock();
}
}

private Credentials createOAuth2Credentials(DefaultConnectionSettings<?> connectionSettings) {
private CredentialsManager createOAuth2Credentials(
DefaultConnectionSettings<?> connectionSettings) {
DefaultConnectionSettings.DefaultOAuth2Settings<?> settings = connectionSettings.oauth2();
Consumer<HttpClient.Builder> clientBuilderConsumer;
if (settings.tlsEnabled()) {
Expand All @@ -92,7 +96,7 @@ private Credentials createOAuth2Credentials(DefaultConnectionSettings<?> connect
clientBuilderConsumer,
null,
new GsonTokenParser());
return new TokenCredentials(
return new TokenCredentialsManager(
tokenRequester, environment.scheduledExecutorService(), settings.refreshDelayStrategy());
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
import static java.util.stream.Collectors.*;

import com.rabbitmq.client.amqp.*;
import com.rabbitmq.client.amqp.oauth2.TokenCredentialsManager;
import java.io.IOException;
import java.net.URI;
import java.net.URISyntaxException;
Expand Down Expand Up @@ -512,7 +513,7 @@ static class DefaultOAuth2Settings<T> implements OAuth2Settings<T> {
private String grantType = "client_credentials";
private boolean shared = true;
private Function<Instant, Duration> refreshDelayStrategy =
TokenCredentials.DEFAULT_REFRESH_DELAY_STRATEGY;
TokenCredentialsManager.DEFAULT_REFRESH_DELAY_STRATEGY;

DefaultOAuth2Settings(DefaultConnectionSettings<T> connectionSettings) {
this.connectionSettings = connectionSettings;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,13 +18,14 @@
package com.rabbitmq.client.amqp.impl;

import com.rabbitmq.client.amqp.UsernamePasswordCredentialsProvider;
import com.rabbitmq.client.amqp.oauth2.CredentialsManager;

final class UsernamePasswordCredentials implements Credentials {
final class UsernamePasswordCredentialsManager implements CredentialsManager {

private final UsernamePasswordCredentialsProvider provider;
private final Registration registration;

UsernamePasswordCredentials(UsernamePasswordCredentialsProvider provider) {
UsernamePasswordCredentialsManager(UsernamePasswordCredentialsProvider provider) {
this.provider = provider;
this.registration = new RegistrationImpl(provider);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,11 +15,11 @@
//
// If you have any questions regarding licensing, please contact us at
// info@rabbitmq.com.
package com.rabbitmq.client.amqp.impl;
package com.rabbitmq.client.amqp.oauth2;

interface Credentials {
public interface CredentialsManager {

Credentials NO_OP = new NoOpCredentials();
CredentialsManager NO_OP = new NoOpCredentialsManager();

Registration register(String name, AuthenticationCallback updateCallback);

Expand All @@ -35,7 +35,7 @@ interface AuthenticationCallback {
void authenticate(String username, String password);
}

class NoOpCredentials implements Credentials {
class NoOpCredentialsManager implements CredentialsManager {

@Override
public Registration register(String name, AuthenticationCallback updateCallback) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,10 +15,8 @@
//
// If you have any questions regarding licensing, please contact us at
// info@rabbitmq.com.
package com.rabbitmq.client.amqp.impl;
package com.rabbitmq.client.amqp.oauth2;

import com.rabbitmq.client.amqp.oauth2.Token;
import com.rabbitmq.client.amqp.oauth2.TokenRequester;
import edu.umd.cs.findbugs.annotations.SuppressFBWarnings;
import java.time.Duration;
import java.time.Instant;
Expand All @@ -39,11 +37,11 @@
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

final class TokenCredentials implements Credentials {
public final class TokenCredentialsManager implements CredentialsManager {

static Function<Instant, Duration> DEFAULT_REFRESH_DELAY_STRATEGY =
public static Function<Instant, Duration> DEFAULT_REFRESH_DELAY_STRATEGY =
ratioRefreshDelayStrategy(0.8f);
private static final Logger LOGGER = LoggerFactory.getLogger(TokenCredentials.class);
private static final Logger LOGGER = LoggerFactory.getLogger(TokenCredentialsManager.class);

private final TokenRequester requester;
private final ScheduledExecutorService scheduledExecutorService;
Expand All @@ -55,7 +53,7 @@ final class TokenCredentials implements Credentials {
private final Function<Instant, Duration> refreshDelayStrategy;
private volatile ScheduledFuture<?> refreshTask;

TokenCredentials(
public TokenCredentialsManager(
TokenRequester requester,
ScheduledExecutorService scheduledExecutorService,
Function<Instant, Duration> refreshDelayStrategy) {
Expand All @@ -81,12 +79,15 @@ private Token getToken() {
LOGGER.debug(
"Requesting new token ({})...", registrationSummary(this.registrations.values()));
}
Utils.StopWatch stopWatch = new Utils.StopWatch();
long start = 0L;
if (LOGGER.isDebugEnabled()) {
start = System.nanoTime();
}
Token token = requester.request();
if (LOGGER.isDebugEnabled()) {
LOGGER.debug(
"Got new token in {} ms, token expires on {} ({})",
stopWatch.stop().toMillis(),
Duration.ofNanos(System.nanoTime() - start),
format(token.expirationTime()),
registrationSummary(this.registrations.values()));
}
Expand Down Expand Up @@ -287,7 +288,7 @@ public String toString() {
}
}

static Function<Instant, Duration> ratioRefreshDelayStrategy(float ratio) {
public static Function<Instant, Duration> ratioRefreshDelayStrategy(float ratio) {
return new RatioRefreshDelayStrategy(ratio);
}

Expand Down
8 changes: 4 additions & 4 deletions src/test/java/com/rabbitmq/client/amqp/impl/Assertions.java
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@
import org.apache.qpid.protonj2.types.UnsignedLong;
import org.assertj.core.api.AbstractObjectAssert;

final class Assertions {
public final class Assertions {

private Assertions() {}

Expand All @@ -42,7 +42,7 @@ static QueueInfoAssert assertThat(Management.QueueInfo queueInfo) {
return new QueueInfoAssert(queueInfo);
}

static SyncAssert assertThat(TestUtils.Sync sync) {
public static SyncAssert assertThat(TestUtils.Sync sync) {
return new SyncAssert(sync);
}

Expand Down Expand Up @@ -83,13 +83,13 @@ CountDownLatchAssert completes(Duration timeout) {
}
}

static class SyncAssert extends AbstractObjectAssert<SyncAssert, TestUtils.Sync> {
public static class SyncAssert extends AbstractObjectAssert<SyncAssert, TestUtils.Sync> {

private SyncAssert(TestUtils.Sync sync) {
super(sync, SyncAssert.class);
}

SyncAssert completes() {
public SyncAssert completes() {
return this.completes(TestUtils.DEFAULT_CONDITION_TIMEOUT);
}

Expand Down
59 changes: 59 additions & 0 deletions src/test/java/com/rabbitmq/client/amqp/impl/OAuth2RealTest.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,59 @@
// Copyright (c) 2024 Broadcom. All Rights Reserved.
// The term "Broadcom" refers to Broadcom Inc. and/or its subsidiaries.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
//
// If you have any questions regarding licensing, please contact us at
// info@rabbitmq.com.
package com.rabbitmq.client.amqp.impl;

import com.rabbitmq.client.amqp.Connection;
import com.rabbitmq.client.amqp.Environment;
import com.rabbitmq.client.amqp.Publisher;
import java.util.UUID;
import org.junit.jupiter.api.Test;

public class OAuth2RealTest {

@Test
void test() throws Exception {
String token =
"eyJhbGciOiJSUzI1NiIsInR5cCIgOiAiSldUIiwia2lkIiA6ICJHbmwyWmxiUmgzckFyNld5bWM5ODhfNWNZN1Q1R3VlUGQ1ZHBKbFhESlVrIn0.eyJleHAiOjE3MzQ2ODI2MjksImlhdCI6MTczNDY4MjMyOSwianRpIjoiMzg0ZTY2ZTMtOGE4NC00ZmI1LWIyZjAtZDg5NDI4YzcxMjg2IiwiaXNzIjoiaHR0cHM6Ly9sb2NhbGhvc3Q6ODQ0My9yZWFsbXMvdGVzdCIsImF1ZCI6InJhYmJpdG1xIiwic3ViIjoiNTBiMzg0NjQtNDQwYy00OGQyLTg4MGQtMDUwYTdkMjhmN2IwIiwidHlwIjoiQmVhcmVyIiwiYXpwIjoicHJvZHVjZXIiLCJzZXNzaW9uX3N0YXRlIjoiOWNjNGNiZGYtYjY4ZS00NTM3LWJjNmMtMmRiOGI3NGY3MGY2IiwicmVhbG1fYWNjZXNzIjp7InJvbGVzIjpbImRlZmF1bHQtcm9sZXMtdGVzdCIsIm9mZmxpbmVfYWNjZXNzIiwidW1hX2F1dGhvcml6YXRpb24iXX0sInJlc291cmNlX2FjY2VzcyI6eyJhY2NvdW50Ijp7InJvbGVzIjpbIm1hbmFnZS1hY2NvdW50IiwibWFuYWdlLWFjY291bnQtbGlua3MiLCJ2aWV3LXByb2ZpbGUiXX19LCJzY29wZSI6InJhYmJpdG1xLnJlYWQ6Ki8qIHJhYmJpdG1xLndyaXRlOiovKiByYWJiaXRtcS5jb25maWd1cmU6Ki8qIiwic2lkIjoiOWNjNGNiZGYtYjY4ZS00NTM3LWJjNmMtMmRiOGI3NGY3MGY2IiwiY2xpZW50SWQiOiJwcm9kdWNlciIsImNsaWVudEhvc3QiOiIxNzIuMjIuMC4xIiwiY2xpZW50QWRkcmVzcyI6IjE3Mi4yMi4wLjEifQ.flIe8YhawDB4Xpp1mxbsAZtwXcaX1OSztZX7QUYmQhbDCtABH7ywAZsgnoIs6zYR1alrxvtCFF9FlHeO1hzK4KW07bba5FX2ttFnd-6Z_9uQQM9YhNH80uRfakQDr6goUSaV2vTY0DqFTMtgQIl7Bj4DFwoGzsfGAeidXpK8uFrZMjc3SONk1LdXA005jXmzjPjcdvfMGHM4RG0Rx9zvonou_SsOaEmbg026jdOlVmVsLljxkkZF5VMTQTfbtEFicPUnmAN4GdCee0gAoDu7LsgFZUWn-t6QDgLmdGtMZPo3zQcaWsKXPtuea7_FcPIO9l25zN6jeE72UwBT3_Io4g";
Environment environment =
new AmqpEnvironmentBuilder()
.connectionSettings()
.oauth2()
.tokenEndpointUri("https://localhost:8443/realms/test/protocol/openid-connect/token")
.clientId("producer")
.clientSecret("kbOFBXI9tANgKUq8vXHLhT6YhbivgXxn")
.tls()
.sslContext(TlsTestUtils.alwaysTrustSslContext())
.oauth2()
.connection()
.environmentBuilder()
.build();

Connection c = environment.connectionBuilder().build();

String q = UUID.randomUUID().toString();
c.management().queue(q).exclusive(true).declare();

c.consumerBuilder().queue(q).messageHandler((ctx, msg) -> ctx.accept()).build();

Publisher p = c.publisherBuilder().queue(q).build();
while (true) {
Thread.sleep(100);
p.publish(p.message(), ctx -> {});
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -24,8 +24,8 @@
import static com.rabbitmq.client.amqp.impl.JwtTestUtils.*;
import static com.rabbitmq.client.amqp.impl.TestConditions.BrokerVersion.RABBITMQ_4_1_0;
import static com.rabbitmq.client.amqp.impl.TestUtils.*;
import static com.rabbitmq.client.amqp.impl.TokenCredentials.ratioRefreshDelayStrategy;
import static com.rabbitmq.client.amqp.oauth2.OAuth2TestUtils.sampleJsonToken;
import static com.rabbitmq.client.amqp.oauth2.TokenCredentialsManager.ratioRefreshDelayStrategy;
import static java.lang.System.currentTimeMillis;
import static java.nio.charset.StandardCharsets.UTF_8;
import static java.time.Duration.*;
Expand Down
Loading

0 comments on commit eabdbbe

Please sign in to comment.