Skip to content

Commit

Permalink
Pulsar proxy (apache#11)
Browse files Browse the repository at this point in the history
  • Loading branch information
Jai Asher authored and GitHub Enterprise committed Feb 8, 2018
1 parent 5c35550 commit a86e472
Show file tree
Hide file tree
Showing 37 changed files with 3,512 additions and 372 deletions.
7 changes: 7 additions & 0 deletions conf/broker.conf
Original file line number Diff line number Diff line change
Expand Up @@ -153,6 +153,13 @@ enablePersistentTopics=true
enableNonPersistentTopics=true

### --- Authentication --- ###
# Role names that are treated as "proxy roles". If the broker sees a request with
#role as proxyRoles - it will demand to see a valid original principal.
proxyRoles=

# If this flag is set then the broker authenticates the original Auth data
# else it just accepts the originalPrincipal and authorizes it (if required).
authenticateOriginalAuthData=false

# Enable TLS
tlsEnabled=false
Expand Down
4 changes: 4 additions & 0 deletions conf/proxy.conf
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,10 @@ brokerClientAuthenticationParameters=
# operations and publish/consume from all topics (comma-separated)
superUserRoles=

# Forward client authorization Credentials to Broker for re authorization
# make sure authentication is enabled for this to take effect
forwardAuthorizationCredentials=false

##### --- TLS --- #####

# Enable TLS in the proxy
Expand Down
7 changes: 7 additions & 0 deletions conf/standalone.conf
Original file line number Diff line number Diff line change
Expand Up @@ -143,6 +143,13 @@ enablePersistentTopics=true
enableNonPersistentTopics=true

### --- Authentication --- ###
# Role names that are treated as "proxy roles". If the broker sees a request with
#role as proxyRoles - it will demand to see a valid original principal.
proxyRoles=

# If this flag is set then the broker authenticates the original Auth data
# else it just accepts the originalPrincipal and authorizes it (if required).
authenticateOriginalAuthData=false

# Enable authentication
authenticationEnabled=false
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -179,6 +179,14 @@ public class ServiceConfiguration implements PulsarConfiguration {
// do all admin operations and publish/consume from all topics
private Set<String> superUserRoles = Sets.newTreeSet();

// Role names that are treated as "proxy roles". If the broker sees a request with
// role as proxyRoles - it will demand to see the original client role or certificate.
private Set<String> proxyRoles = Sets.newTreeSet();

// If this flag is set then the broker authenticates the original Auth data
// else it just accepts the originalPrincipal and authorizes it (if required).
private boolean authenticateOriginalAuthData = false;

// Allow wildcard matching in authorization
// (wildcard matching only applicable if wildcard-char:
// * presents at first or last position eg: *.pulsar.service, pulsar.service.*)
Expand Down Expand Up @@ -718,7 +726,15 @@ public void setAuthorizationEnabled(boolean authorizationEnabled) {
public Set<String> getSuperUserRoles() {
return superUserRoles;
}


public Set<String> getProxyRoles() {
return proxyRoles;
}

public void setProxyRoles(Set<String> proxyRoles) {
this.proxyRoles = proxyRoles;
}

public boolean getAuthorizationAllowWildcardsMatching() {
return authorizationAllowWildcardsMatching;
}
Expand Down Expand Up @@ -1225,4 +1241,12 @@ public void setPreferLaterVersions(boolean preferLaterVersions) {
public int getWebSocketConnectionsPerBroker() { return webSocketConnectionsPerBroker; }

public void setWebSocketConnectionsPerBroker(int webSocketConnectionsPerBroker) { this.webSocketConnectionsPerBroker = webSocketConnectionsPerBroker; }

public boolean authenticateOriginalAuthData() {
return authenticateOriginalAuthData;
}

public void setAuthenticateOriginalAuthData(boolean authenticateOriginalAuthData) {
this.authenticateOriginalAuthData = authenticateOriginalAuthData;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,9 @@
import java.util.Map;
import java.util.Set;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.atomic.AtomicBoolean;

import static java.util.concurrent.TimeUnit.SECONDS;
import static org.apache.pulsar.zookeeper.ZooKeeperCache.cacheTimeOutInSec;

Expand Down Expand Up @@ -64,8 +67,8 @@ public boolean canProduce(DestinationName destination, String role) throws Excep
log.warn("Time-out {} sec while checking authorization on {} ", cacheTimeOutInSec, destination);
throw e;
} catch (Exception e) {
log.warn("Producer-client with Role - {} failed to get permissions for destination - {}", role,
destination, e);
log.warn("Producer-client with Role - {} failed to get permissions for destination - {}. {}", role,
destination, e.getMessage());
throw e;
}
}
Expand All @@ -90,8 +93,8 @@ public boolean canConsume(DestinationName destination, String role) throws Excep
log.warn("Time-out {} sec while checking authorization on {} ", cacheTimeOutInSec, destination);
throw e;
} catch (Exception e) {
log.warn("Consumer-client with Role - {} failed to get permissions for destination - {}", role,
destination, e);
log.warn("Consumer-client with Role - {} failed to get permissions for destination - {}. {}", role,
destination, e.getMessage());
throw e;
}
}
Expand All @@ -110,8 +113,54 @@ public boolean canLookup(DestinationName destination, String role) throws Except
return canProduce(destination, role) || canConsume(destination, role);
}

private CompletableFuture<Boolean> checkAuthorization(DestinationName destination, String role,
AuthAction action) {
/**
* Check whether the specified role can perform a lookup for the specified destination.
*
* For that the caller needs to have producer or consumer permission.
*
* @param destination
* @param role
* @return
* @throws Exception
*/
public CompletableFuture<Boolean> canLookupAsync(DestinationName destination, String role) {
CompletableFuture<Boolean> finalResult = new CompletableFuture<Boolean>();
canProduceAsync(destination, role).whenComplete((produceAuthorized, ex) -> {
if (ex == null) {
if (produceAuthorized) {
finalResult.complete(produceAuthorized);
return;
}
} else {
if (log.isDebugEnabled()) {
log.debug(
"Destination [{}] Role [{}] exception occured while trying to check Produce permissions. {}",
destination.toString(), role, ex.getMessage());
}
}
canConsumeAsync(destination, role).whenComplete((consumeAuthorized, e) -> {
if (e == null) {
if (consumeAuthorized) {
finalResult.complete(consumeAuthorized);
return;
}
} else {
if (log.isDebugEnabled()) {
log.debug(
"Destination [{}] Role [{}] exception occured while trying to check Consume permissions. {}",
destination.toString(), role, e.getMessage());

}
finalResult.completeExceptionally(e);
return;
}
finalResult.complete(false);
});
});
return finalResult;
}

private CompletableFuture<Boolean> checkAuthorization(DestinationName destination, String role, AuthAction action) {
if (isSuperUser(role)) {
return CompletableFuture.completedFuture(true);
} else {
Expand Down Expand Up @@ -178,13 +227,13 @@ public CompletableFuture<Boolean> checkPermission(DestinationName destination, S
}
permissionFuture.complete(false);
}).exceptionally(ex -> {
log.warn("Client with Role - {} failed to get permissions for destination - {}", role, destination,
ex);
log.warn("Client with Role - {} failed to get permissions for destination - {}. {}", role, destination,
ex.getMessage());
permissionFuture.completeExceptionally(ex);
return null;
});
} catch (Exception e) {
log.warn("Client with Role - {} failed to get permissions for destination - {}", role, destination, e);
log.warn("Client with Role - {} failed to get permissions for destination - {}. {}", role, destination, e.getMessage());
permissionFuture.completeExceptionally(e);
}
return permissionFuture;
Expand All @@ -204,8 +253,7 @@ private boolean checkWildcardPermission(String checkedRole, AuthAction checkedAc
}

// Suffix match
if (permittedRole.charAt(0) == '*'
&& checkedRole.endsWith(permittedRole.substring(1))
if (permittedRole.charAt(0) == '*' && checkedRole.endsWith(permittedRole.substring(1))
&& permittedActions.contains(checkedAction)) {
return true;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1254,7 +1254,7 @@ public static CompletableFuture<PartitionedTopicMetadata> getPartitionedTopicMet
validateAdminAccessOnProperty(pulsar, clientAppId, dn.getProperty());
} catch (RestException authException) {
log.warn("Failed to authorize {} on cluster {}", clientAppId, dn.toString());
throw new PulsarClientException(String.format("Authorization failed %s on cluster %s with error %s",
throw new PulsarClientException(String.format("Authorization failed %s on topic %s with error %s",
clientAppId, dn.toString(), authException.getMessage()));
}
} catch (Exception ex) {
Expand Down
Loading

0 comments on commit a86e472

Please sign in to comment.