From f30870d73896cae731fbe29a464c09781e2fe49e Mon Sep 17 00:00:00 2001 From: guangning Date: Sat, 23 Sep 2023 09:17:56 +0800 Subject: [PATCH 1/2] Fixed produce and consume --- .../org/apache/pulsar/broker/service/ServerCnx.java | 10 +++++++--- 1 file changed, 7 insertions(+), 3 deletions(-) diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java index 5809e1297fcf6..a3d163c0a78bf 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java @@ -1050,9 +1050,13 @@ protected void handleConnect(CommandConnect connect) { .getAuthenticationProvider(originalAuthMethod); if (originalAuthenticationProvider == null) { - throw new AuthenticationException( - String.format("Can't find AuthenticationProvider for original role" - + " using auth method [%s] is not available", originalAuthMethod)); + authRole = getBrokerService().getAuthenticationService().getAnonymousUserRole() + .orElseThrow(() -> + new AuthenticationException("No anonymous role, and can't find " + + "AuthenticationProvider for original role using auth method " + + "[" + originalAuthMethod + "] is not available")); + completeConnect(clientProtocolVersion, clientVersion); + return; } originalAuthDataCopy = AuthData.of(connect.getOriginalAuthData().getBytes()); From 310623d157846587ef685a1a909b99a335986734 Mon Sep 17 00:00:00 2001 From: guangning Date: Sat, 23 Sep 2023 21:34:20 +0800 Subject: [PATCH 2/2] Add comment and test --- .../pulsar/broker/service/ServerCnx.java | 9 ++++- .../pulsar/broker/service/ServerCnxTest.java | 35 +++++++++++++++++++ 2 files changed, 43 insertions(+), 1 deletion(-) diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java index a3d163c0a78bf..0517fff0f03f5 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java @@ -990,7 +990,6 @@ protected void handleConnect(CommandConnect connect) { try { byte[] authData = connect.hasAuthData() ? connect.getAuthData() : emptyArray; AuthData clientData = AuthData.of(authData); - // init authentication if (connect.hasAuthMethodName()) { authMethod = connect.getAuthMethodName(); @@ -1049,12 +1048,20 @@ protected void handleConnect(CommandConnect connect) { .getAuthenticationService() .getAuthenticationProvider(originalAuthMethod); + /** + * When both the broker and the proxy are configured with anonymousUserRole + * if the client does not configure an authentication method + * the proxy side will set the value of anonymousUserRole to clientAuthRole when it creates a connection + * and the value of clientAuthMethod will be none. + * Similarly, should also set the value of authRole to anonymousUserRole on the broker side. + */ if (originalAuthenticationProvider == null) { authRole = getBrokerService().getAuthenticationService().getAnonymousUserRole() .orElseThrow(() -> new AuthenticationException("No anonymous role, and can't find " + "AuthenticationProvider for original role using auth method " + "[" + originalAuthMethod + "] is not available")); + originalPrincipal = authRole; completeConnect(clientProtocolVersion, clientVersion); return; } diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ServerCnxTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ServerCnxTest.java index 2ea5e28880bf8..5fd4881981365 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ServerCnxTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ServerCnxTest.java @@ -506,6 +506,41 @@ public void testConnectCommandWithPassingOriginalAuthData() throws Exception { channel.finish(); } + @Test(timeOut = 30000) + public void testConnectCommandWithPassingOriginalAuthDataAndSetAnonymousUserRole() throws Exception { + AuthenticationService authenticationService = mock(AuthenticationService.class); + AuthenticationProvider authenticationProvider = new MockAuthenticationProvider(); + String authMethodName = authenticationProvider.getAuthMethodName(); + + String anonymousUserRole = "admin"; + when(brokerService.getAuthenticationService()).thenReturn(authenticationService); + when(authenticationService.getAuthenticationProvider(authMethodName)).thenReturn(authenticationProvider); + when(authenticationService.getAnonymousUserRole()).thenReturn(Optional.of(anonymousUserRole)); + svcConfig.setAuthenticationEnabled(true); + svcConfig.setAuthenticateOriginalAuthData(true); + svcConfig.setProxyRoles(Collections.singleton("pass.proxy")); + svcConfig.setAnonymousUserRole(anonymousUserRole); + + resetChannel(); + assertTrue(channel.isActive()); + assertEquals(serverCnx.getState(), State.Start); + + // When both the proxy and the broker set the anonymousUserRole option + // the proxy will use anonymousUserRole to delegate the client's role when connecting. + ByteBuf clientCommand = Commands.newConnect(authMethodName, "pass.proxy", 1, null, + null, anonymousUserRole, null, null); + channel.writeInbound(clientCommand); + + Object response1 = getResponse(); + assertTrue(response1 instanceof CommandConnected); + assertEquals(serverCnx.getState(), State.Connected); + assertEquals(serverCnx.getAuthRole(), anonymousUserRole); + assertEquals(serverCnx.getPrincipal(), anonymousUserRole); + assertEquals(serverCnx.getOriginalPrincipal(), anonymousUserRole); + assertTrue(serverCnx.isActive()); + channel.finish(); + } + @Test(timeOut = 30000) public void testConnectCommandWithPassingOriginalPrincipal() throws Exception { AuthenticationService authenticationService = mock(AuthenticationService.class);