Skip to content

Commit

Permalink
[authentication] Update original auth data after auth data refresh (#…
Browse files Browse the repository at this point in the history
…21660)

Co-authored-by: Zixuan Liu <nodeces@gmail.com>
  • Loading branch information
gaoran10 and nodece authored Dec 8, 2023
1 parent 207c20d commit aab1d1e
Show file tree
Hide file tree
Showing 6 changed files with 170 additions and 61 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -654,24 +654,14 @@ private void doAuthentication(AuthData clientData,

String newAuthRole = authState.getAuthRole();

// Refresh the auth data.
this.authenticationData = authState.getAuthDataSource();
if (log.isDebugEnabled()) {
log.debug("[{}] Auth data refreshed for role={}", remoteAddress, this.authRole);
}

if (!useOriginalAuthState) {
this.authRole = newAuthRole;
}

if (log.isDebugEnabled()) {
log.debug("[{}] Client successfully authenticated with {} role {} and originalPrincipal {}",
remoteAddress, authMethod, this.authRole, originalPrincipal);
}

AuthenticationDataSource newAuthDataSource = authState.getAuthDataSource();
if (state != State.Connected) {
// First time authentication is done
if (service.isAuthenticationEnabled()) {
if (!useOriginalAuthState) {
this.authRole = newAuthRole;
this.authenticationData = newAuthDataSource;
}
if (service.isAuthorizationEnabled()) {
if (!service.getAuthorizationService()
.isValidOriginalPrincipal(this.authRole, originalPrincipal, remoteAddress, false)) {
Expand All @@ -686,7 +676,16 @@ private void doAuthentication(AuthData clientData,
maybeScheduleAuthenticationCredentialsRefresh();
}
completeConnect(clientProtocolVersion, clientVersion);
if (log.isDebugEnabled()) {
log.debug("[{}] Client successfully authenticated with {} role {} and originalPrincipal {}",
remoteAddress, authMethod, this.authRole, originalPrincipal);
}
} else {
if (!useOriginalAuthState) {
this.authenticationData = newAuthDataSource;
} else {
this.originalAuthData = newAuthDataSource;
}
// If the connection was already ready, it means we're doing a refresh
if (!StringUtils.isEmpty(authRole)) {
if (!authRole.equals(newAuthRole)) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,56 +18,21 @@
*/
package org.apache.pulsar.broker.auth;

import org.apache.pulsar.broker.authentication.AuthenticationDataCommand;
import org.apache.pulsar.broker.authentication.AuthenticationDataSource;
import org.apache.pulsar.broker.authentication.AuthenticationState;
import org.apache.pulsar.common.api.AuthData;

import javax.naming.AuthenticationException;

import static java.nio.charset.StandardCharsets.UTF_8;
import org.apache.pulsar.broker.authentication.AuthenticationProvider;

/**
* Class to use when verifying the behavior around expired authentication data because it will always return
* true when isExpired is called.
*/
public class MockAlwaysExpiredAuthenticationState implements AuthenticationState {
final MockAlwaysExpiredAuthenticationProvider provider;
AuthenticationDataSource authenticationDataSource;
volatile String authRole;

MockAlwaysExpiredAuthenticationState(MockAlwaysExpiredAuthenticationProvider provider) {
this.provider = provider;
}


@Override
public String getAuthRole() throws AuthenticationException {
if (authRole == null) {
throw new AuthenticationException("Must authenticate first.");
}
return authRole;
}
public class MockAlwaysExpiredAuthenticationState extends MockMutableAuthenticationState {

@Override
public AuthData authenticate(AuthData authData) throws AuthenticationException {
authenticationDataSource = new AuthenticationDataCommand(new String(authData.getBytes(), UTF_8));
authRole = provider.authenticate(authenticationDataSource);
return null;
}

@Override
public AuthenticationDataSource getAuthDataSource() {
return authenticationDataSource;
}

@Override
public boolean isComplete() {
return authRole != null;
MockAlwaysExpiredAuthenticationState(AuthenticationProvider provider) {
super(provider);
}

@Override
public boolean isExpired() {
return true;
}

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,32 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.pulsar.broker.auth;

import org.apache.pulsar.broker.authentication.AuthenticationState;
import org.apache.pulsar.common.api.AuthData;
import javax.net.ssl.SSLSession;
import java.net.SocketAddress;

public class MockMutableAuthenticationProvider extends MockAuthenticationProvider {
public AuthenticationState newAuthState(AuthData authData,
SocketAddress remoteAddress,
SSLSession sslSession) {
return new MockMutableAuthenticationState(this);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,70 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.pulsar.broker.auth;

import static java.nio.charset.StandardCharsets.UTF_8;
import javax.naming.AuthenticationException;
import org.apache.pulsar.broker.authentication.AuthenticationDataCommand;
import org.apache.pulsar.broker.authentication.AuthenticationDataSource;
import org.apache.pulsar.broker.authentication.AuthenticationProvider;
import org.apache.pulsar.broker.authentication.AuthenticationState;
import org.apache.pulsar.common.api.AuthData;

// MockMutableAuthenticationState always update the authentication data source and auth role.
public class MockMutableAuthenticationState implements AuthenticationState {

final AuthenticationProvider provider;
AuthenticationDataSource authenticationDataSource;
volatile String authRole;

MockMutableAuthenticationState(AuthenticationProvider provider) {
this.provider = provider;
}

@Override
public String getAuthRole() throws AuthenticationException {
if (authRole == null) {
throw new AuthenticationException("Must authenticate first.");
}
return authRole;
}

@Override
public AuthData authenticate(AuthData authData) throws AuthenticationException {
authenticationDataSource = new AuthenticationDataCommand(new String(authData.getBytes(), UTF_8));
authRole = provider.authenticate(authenticationDataSource);
return null;
}

@Override
public AuthenticationDataSource getAuthDataSource() {
return authenticationDataSource;
}

@Override
public boolean isComplete() {
return authRole != null;
}

@Override
public boolean isExpired() {
return false;
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -248,13 +248,8 @@ protected void setup() throws Exception {
.brokerServiceUrl(pulsar4.getBrokerServiceUrl())
.brokerServiceUrlTls(pulsar4.getBrokerServiceUrlTls())
.brokerClientTlsEnabled(true)
.brokerClientCertificateFilePath(clientCertFilePath)
.brokerClientKeyFilePath(clientKeyFilePath)
.brokerClientTrustCertsFilePath(caCertFilePath)
.brokerClientTlsEnabledWithKeyStore(tlsWithKeyStore)
.brokerClientTlsKeyStore(clientKeyStorePath)
.brokerClientTlsKeyStorePassword(keyStorePassword)
.brokerClientTlsKeyStoreType(keyStoreType)
.brokerClientTlsTrustStore(clientTrustStorePath)
.brokerClientTlsTrustStorePassword(keyStorePassword)
.brokerClientTlsTrustStoreType(keyStoreType)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -87,6 +87,7 @@
import org.apache.pulsar.broker.TransactionMetadataStoreService;
import org.apache.pulsar.broker.auth.MockAlwaysExpiredAuthenticationProvider;
import org.apache.pulsar.broker.auth.MockAuthorizationProvider;
import org.apache.pulsar.broker.auth.MockMutableAuthenticationProvider;
import org.apache.pulsar.broker.authentication.AuthenticationDataSubscription;
import org.apache.pulsar.broker.auth.MockAuthenticationProvider;
import org.apache.pulsar.broker.auth.MockMultiStageAuthenticationProvider;
Expand Down Expand Up @@ -123,7 +124,6 @@
import org.apache.pulsar.common.api.proto.CommandProducerSuccess;
import org.apache.pulsar.common.api.proto.CommandSendError;
import org.apache.pulsar.common.api.proto.CommandSendReceipt;
import org.apache.pulsar.common.api.proto.CommandSubscribe;
import org.apache.pulsar.common.api.proto.CommandSubscribe.InitialPosition;
import org.apache.pulsar.common.api.proto.CommandSubscribe.SubType;
import org.apache.pulsar.common.api.proto.CommandSuccess;
Expand Down Expand Up @@ -1368,6 +1368,54 @@ public void testVerifyAuthRoleAndAuthDataFromDirectConnectionBroker() throws Exc
}));
}

@Test
public void testRefreshOriginalPrincipalWithAuthDataForwardedFromProxy() throws Exception {
AuthenticationService authenticationService = mock(AuthenticationService.class);
AuthenticationProvider authenticationProvider = new MockMutableAuthenticationProvider();
String authMethodName = authenticationProvider.getAuthMethodName();
when(brokerService.getAuthenticationService()).thenReturn(authenticationService);
when(authenticationService.getAuthenticationProvider(authMethodName)).thenReturn(authenticationProvider);
svcConfig.setAuthenticationEnabled(true);
svcConfig.setAuthenticateOriginalAuthData(true);
svcConfig.setProxyRoles(Collections.singleton("pass.proxy"));

resetChannel();
assertTrue(channel.isActive());
assertEquals(serverCnx.getState(), State.Start);

String proxyRole = "pass.proxy";
String clientRole = "pass.client";
ByteBuf connect = Commands.newConnect(authMethodName, proxyRole, "test", "localhost",
clientRole, clientRole, authMethodName);
channel.writeInbound(connect);
Object connectResponse = getResponse();
assertTrue(connectResponse instanceof CommandConnected);
assertEquals(serverCnx.getOriginalAuthData().getCommandData(), clientRole);
assertEquals(serverCnx.getOriginalAuthState().getAuthRole(), clientRole);
assertEquals(serverCnx.getOriginalPrincipal(), clientRole);
assertEquals(serverCnx.getAuthData().getCommandData(), proxyRole);
assertEquals(serverCnx.getAuthRole(), proxyRole);
assertEquals(serverCnx.getAuthState().getAuthRole(), proxyRole);

// Request refreshing the original auth.
// Expected:
// 1. Original role and original data equals to "pass.RefreshOriginAuthData".
// 2. The broker disconnects the client, because the new role doesn't equal the old role.
String newClientRole = "pass.RefreshOriginAuthData";
ByteBuf refreshAuth = Commands.newAuthResponse(authMethodName,
AuthData.of(newClientRole.getBytes(StandardCharsets.UTF_8)), 0, "test");
channel.writeInbound(refreshAuth);

assertEquals(serverCnx.getOriginalAuthData().getCommandData(), newClientRole);
assertEquals(serverCnx.getOriginalAuthState().getAuthRole(), newClientRole);
assertEquals(serverCnx.getAuthData().getCommandData(), proxyRole);
assertEquals(serverCnx.getAuthRole(), proxyRole);
assertEquals(serverCnx.getAuthState().getAuthRole(), proxyRole);

assertFalse(channel.isOpen());
assertFalse(channel.isActive());
}

@Test(timeOut = 30000)
public void testProducerCommand() throws Exception {
resetChannel();
Expand Down

0 comments on commit aab1d1e

Please sign in to comment.