Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[fix][broker] fix Update contains no change error when use --update-auth-data flag to update function/sink/source #19450

Merged
merged 3 commits into from
Jun 13, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -748,7 +748,7 @@ private static void verifyNoTopicClash(Collection<String> inputTopics, String ou
}
}

private static void doCommonChecks(FunctionConfig functionConfig) {
public static void doCommonChecks(FunctionConfig functionConfig) {
if (isEmpty(functionConfig.getTenant())) {
throw new IllegalArgumentException("Function tenant cannot be null");
}
Expand Down Expand Up @@ -890,7 +890,7 @@ private static void doCommonChecks(FunctionConfig functionConfig) {
}
}

private static Collection<String> collectAllInputTopics(FunctionConfig functionConfig) {
public static Collection<String> collectAllInputTopics(FunctionConfig functionConfig) {
List<String> retval = new LinkedList<>();
if (functionConfig.getInputs() != null) {
retval.addAll(functionConfig.getInputs());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -524,7 +524,7 @@ public static ExtractedSinkDetails validateAndExtractDetails(SinkConfig sinkConf
return new ExtractedSinkDetails(sinkClassName, typeArg.getName(), functionClassName);
}

private static Collection<String> collectAllInputTopics(SinkConfig sinkConfig) {
public static Collection<String> collectAllInputTopics(SinkConfig sinkConfig) {
List<String> retval = new LinkedList<>();
if (sinkConfig.getInputs() != null) {
retval.addAll(sinkConfig.getInputs());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -290,7 +290,8 @@ public void updateFunction(final String tenant,
throw new RestException(Response.Status.BAD_REQUEST, e.getMessage());
}

if (existingFunctionConfig.equals(mergedConfig) && isBlank(functionPkgUrl) && uploadedInputStream == null) {
if (existingFunctionConfig.equals(mergedConfig) && isBlank(functionPkgUrl) && uploadedInputStream == null
&& (updateOptions == null || !updateOptions.isUpdateAuthData())) {
log.error("{}/{}/{} Update contains no changes", tenant, namespace, functionName);
throw new RestException(Response.Status.BAD_REQUEST, "Update contains no change");
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -295,7 +295,8 @@ public void updateSink(final String tenant,
throw new RestException(Response.Status.BAD_REQUEST, e.getMessage());
}

if (existingSinkConfig.equals(mergedConfig) && isBlank(sinkPkgUrl) && uploadedInputStream == null) {
if (existingSinkConfig.equals(mergedConfig) && isBlank(sinkPkgUrl) && uploadedInputStream == null
&& (updateOptions == null || !updateOptions.isUpdateAuthData())) {
log.error("{}/{}/{} Update contains no changes", tenant, namespace, sinkName);
throw new RestException(Response.Status.BAD_REQUEST, "Update contains no change");
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -289,7 +289,8 @@ public void updateSource(final String tenant,
throw new RestException(Response.Status.BAD_REQUEST, e.getMessage());
}

if (existingSourceConfig.equals(mergedConfig) && isBlank(sourcePkgUrl) && uploadedInputStream == null) {
if (existingSourceConfig.equals(mergedConfig) && isBlank(sourcePkgUrl) && uploadedInputStream == null
&& (updateOptions == null || !updateOptions.isUpdateAuthData())) {
log.error("{}/{}/{} Update contains no changes", tenant, namespace, sourceName);
throw new RestException(Response.Status.BAD_REQUEST, "Update contains no change");
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,8 @@
import static org.mockito.Mockito.spy;
import static org.mockito.Mockito.when;
import static org.testng.Assert.assertEquals;
import static org.testng.Assert.assertTrue;
import static org.testng.Assert.fail;
import com.google.common.collect.Lists;
import java.io.File;
import java.io.FileInputStream;
Expand Down Expand Up @@ -57,6 +59,7 @@
import org.apache.pulsar.client.admin.PulsarAdminException;
import org.apache.pulsar.client.admin.Tenants;
import org.apache.pulsar.common.functions.FunctionConfig;
import org.apache.pulsar.common.functions.UpdateOptionsImpl;
import org.apache.pulsar.common.nar.NarClassLoader;
import org.apache.pulsar.common.policies.data.TenantInfoImpl;
import org.apache.pulsar.common.util.FutureUtil;
Expand Down Expand Up @@ -604,6 +607,96 @@ public void testUpdateMissingFunctionConfig() {
null, null);
}

@Test
public void testUpdateSourceWithNoChange() throws ClassNotFoundException {
mockWorkerUtils();

FunctionDetails functionDetails = createDefaultFunctionDetails();
NarClassLoader mockedClassLoader = mock(NarClassLoader.class);
mockStatic(FunctionCommon.class, ctx -> {
ctx.when(() -> FunctionCommon.getFunctionTypes(any(FunctionConfig.class), any(Class.class))).thenReturn(new Class[]{String.class, String.class});
ctx.when(() -> FunctionCommon.convertRuntime(any(FunctionConfig.Runtime.class))).thenCallRealMethod();
ctx.when(() -> FunctionCommon.isFunctionCodeBuiltin(any())).thenReturn(true);
ctx.when(() -> FunctionCommon.getClassLoaderFromPackage(any(),any(),any(),any())).thenCallRealMethod();
ctx.when(FunctionCommon::createPkgTempFile).thenCallRealMethod();
});

doReturn(Function.class).when(mockedClassLoader).loadClass(anyString());

FunctionsManager mockedFunctionsManager = mock(FunctionsManager.class);
FunctionArchive functionArchive = FunctionArchive.builder()
.classLoader(mockedClassLoader)
.build();
when(mockedFunctionsManager.getFunction("exclamation")).thenReturn(functionArchive);
when(mockedFunctionsManager.getFunctionArchive(any())).thenReturn(getPulsarApiExamplesNar().toPath());

when(mockedWorkerService.getFunctionsManager()).thenReturn(mockedFunctionsManager);
when(mockedManager.containsFunction(eq(tenant), eq(namespace), eq(function))).thenReturn(true);

// No change on config,
FunctionConfig funcConfig = createDefaultFunctionConfig();
mockStatic(FunctionConfigUtils.class, ctx -> {
ctx.when(() -> FunctionConfigUtils.convertFromDetails(any())).thenReturn(funcConfig);
ctx.when(() -> FunctionConfigUtils.validateUpdate(any(), any())).thenCallRealMethod();
ctx.when(() -> FunctionConfigUtils.convert(any(FunctionConfig.class), any(ClassLoader.class))).thenReturn(functionDetails);
ctx.when(() -> FunctionConfigUtils.convert(any(FunctionConfig.class), any(FunctionConfigUtils.ExtractedFunctionDetails.class))).thenReturn(functionDetails);
ctx.when(() -> FunctionConfigUtils.validateJavaFunction(any(), any())).thenCallRealMethod();
ctx.when(() -> FunctionConfigUtils.doCommonChecks(any())).thenCallRealMethod();
ctx.when(() -> FunctionConfigUtils.collectAllInputTopics(any())).thenCallRealMethod();
ctx.when(() -> FunctionConfigUtils.doJavaChecks(any(), any())).thenCallRealMethod();
});

// config has not changes and don't update auth, should fail
try {
resource.updateFunction(
funcConfig.getTenant(),
funcConfig.getNamespace(),
funcConfig.getName(),
null,
mockedFormData,
null,
funcConfig,
null,
null);
fail("Update without changes should fail");
} catch (RestException e) {
assertTrue(e.getMessage().contains("Update contains no change"));
}

try {
UpdateOptionsImpl updateOptions = new UpdateOptionsImpl();
updateOptions.setUpdateAuthData(false);
resource.updateFunction(
funcConfig.getTenant(),
funcConfig.getNamespace(),
funcConfig.getName(),
null,
mockedFormData,
null,
funcConfig,
null,
updateOptions);
fail("Update without changes should fail");
} catch (RestException e) {
assertTrue(e.getMessage().contains("Update contains no change"));
}

// no changes but set the auth-update flag to true, should not fail
UpdateOptionsImpl updateOptions = new UpdateOptionsImpl();
updateOptions.setUpdateAuthData(true);
resource.updateFunction(
funcConfig.getTenant(),
funcConfig.getNamespace(),
funcConfig.getName(),
null,
mockedFormData,
null,
funcConfig,
null,
updateOptions);
}


private void registerDefaultFunction() {
registerDefaultFunctionWithPackageUrl(null);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,21 +18,6 @@
*/
package org.apache.pulsar.functions.worker.rest.api.v3;

import static org.apache.pulsar.functions.proto.Function.ProcessingGuarantees.ATLEAST_ONCE;
import static org.apache.pulsar.functions.source.TopicSchema.DEFAULT_SERDE;
import static org.mockito.ArgumentMatchers.anyBoolean;
import static org.mockito.Mockito.any;
import static org.mockito.Mockito.anyString;
import static org.mockito.Mockito.doAnswer;
import static org.mockito.Mockito.doReturn;
import static org.mockito.Mockito.doThrow;
import static org.mockito.Mockito.eq;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.spy;
import static org.mockito.Mockito.times;
import static org.mockito.Mockito.when;
import static org.testng.Assert.assertEquals;
import static org.testng.Assert.assertNotNull;
import com.google.common.collect.Lists;
import java.io.File;
import java.io.FileInputStream;
Expand Down Expand Up @@ -60,6 +45,7 @@
import org.apache.pulsar.client.admin.PulsarAdminException;
import org.apache.pulsar.client.admin.Tenants;
import org.apache.pulsar.common.functions.FunctionConfig;
import org.apache.pulsar.common.functions.UpdateOptionsImpl;
import org.apache.pulsar.common.functions.Utils;
import org.apache.pulsar.common.io.SinkConfig;
import org.apache.pulsar.common.nar.NarClassLoader;
Expand Down Expand Up @@ -97,6 +83,23 @@
import org.testng.annotations.AfterMethod;
import org.testng.annotations.BeforeMethod;
import org.testng.annotations.Test;
import static org.apache.pulsar.functions.proto.Function.ProcessingGuarantees.ATLEAST_ONCE;
import static org.apache.pulsar.functions.source.TopicSchema.DEFAULT_SERDE;
import static org.mockito.ArgumentMatchers.anyBoolean;
import static org.mockito.Mockito.any;
import static org.mockito.Mockito.anyString;
import static org.mockito.Mockito.doAnswer;
import static org.mockito.Mockito.doReturn;
import static org.mockito.Mockito.doThrow;
import static org.mockito.Mockito.eq;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.spy;
import static org.mockito.Mockito.times;
import static org.mockito.Mockito.when;
import static org.testng.Assert.assertEquals;
import static org.testng.Assert.assertNotNull;
import static org.testng.Assert.assertTrue;
import static org.testng.Assert.fail;

/**
* Unit test of {@link SinksApiV3Resource}.
Expand Down Expand Up @@ -965,29 +968,7 @@ private void testUpdateSinkMissingArguments(
String className,
Integer parallelism,
String expectedError) throws Exception {
mockStatic(ConnectorUtils.class, ctx -> {
ctx.when(() -> ConnectorUtils.getIOSinkClass(any(NarClassLoader.class)))
.thenReturn(CASSANDRA_STRING_SINK);
});

mockStatic(ClassLoaderUtils.class, ctx -> {
});

mockStatic(FunctionCommon.class, ctx -> {
ctx.when(() -> FunctionCommon.createPkgTempFile()).thenCallRealMethod();
ctx.when(() -> FunctionCommon.getClassLoaderFromPackage(any(), any(), any(), any())).thenCallRealMethod();
ctx.when(() -> FunctionCommon.getSinkType(any())).thenReturn(String.class);
ctx.when(() -> FunctionCommon.extractNarClassLoader(any(), any())).thenReturn(mock(NarClassLoader.class));
ctx.when(() -> FunctionCommon
.convertProcessingGuarantee(eq(FunctionConfig.ProcessingGuarantees.ATLEAST_ONCE)))
.thenReturn(ATLEAST_ONCE);
});

this.mockedFunctionMetaData =
FunctionMetaData.newBuilder().setFunctionDetails(createDefaultFunctionDetails()).build();
when(mockedManager.getFunctionMetaData(eq(tenant), eq(namespace), eq(sink))).thenReturn(mockedFunctionMetaData);

when(mockedManager.containsFunction(eq(tenant), eq(namespace), eq(sink))).thenReturn(true);
mockFunctionCommon(tenant, namespace, sink);

SinkConfig sinkConfig = new SinkConfig();
if (tenant != null) {
Expand Down Expand Up @@ -1026,6 +1007,32 @@ private void testUpdateSinkMissingArguments(

}

private void mockFunctionCommon(String tenant, String namespace, String sink) throws IOException {
mockStatic(ConnectorUtils.class, ctx -> {
ctx.when(() -> ConnectorUtils.getIOSinkClass(any(NarClassLoader.class)))
.thenReturn(CASSANDRA_STRING_SINK);
});

mockStatic(ClassLoaderUtils.class, ctx -> {
});

mockStatic(FunctionCommon.class, ctx -> {
ctx.when(() -> FunctionCommon.createPkgTempFile()).thenCallRealMethod();
ctx.when(() -> FunctionCommon.getClassLoaderFromPackage(any(), any(), any(), any())).thenCallRealMethod();
ctx.when(() -> FunctionCommon.getSinkType(any())).thenReturn(String.class);
ctx.when(() -> FunctionCommon.extractNarClassLoader(any(), any())).thenReturn(mock(NarClassLoader.class));
ctx.when(() -> FunctionCommon
.convertProcessingGuarantee(eq(FunctionConfig.ProcessingGuarantees.ATLEAST_ONCE)))
.thenReturn(ATLEAST_ONCE);
});

this.mockedFunctionMetaData =
FunctionMetaData.newBuilder().setFunctionDetails(createDefaultFunctionDetails()).build();
when(mockedManager.getFunctionMetaData(eq(tenant), eq(namespace), eq(sink))).thenReturn(mockedFunctionMetaData);

when(mockedManager.containsFunction(eq(tenant), eq(namespace), eq(sink))).thenReturn(true);
}

private void updateDefaultSink() throws Exception {
updateDefaultSinkWithPackageUrl(null);
}
Expand Down Expand Up @@ -1848,4 +1855,72 @@ public void testRegisterSinkSuccessK8sWithUpload() throws Exception {
}
}
}

@Test
public void testUpdateSinkWithNoChange() throws IOException {
mockWorkerUtils();

// No change on config,
SinkConfig sinkConfig = createDefaultSinkConfig();

mockStatic(SinkConfigUtils.class, ctx -> {
ctx.when(() -> SinkConfigUtils.convertFromDetails(any())).thenReturn(sinkConfig);
ctx.when(() -> SinkConfigUtils.convert(any(), any())).thenCallRealMethod();
ctx.when(() -> SinkConfigUtils.validateUpdate(any(), any())).thenCallRealMethod();
ctx.when(() -> SinkConfigUtils.clone(any())).thenCallRealMethod();
ctx.when(() -> SinkConfigUtils.collectAllInputTopics(any())).thenCallRealMethod();
ctx.when(() -> SinkConfigUtils.validateAndExtractDetails(any(),any(),any(),anyBoolean())).thenCallRealMethod();
});

mockFunctionCommon(sinkConfig.getTenant(), sinkConfig.getNamespace(), sinkConfig.getName());

// config has not changes and don't update auth, should fail
try {
resource.updateSink(
sinkConfig.getTenant(),
sinkConfig.getNamespace(),
sinkConfig.getName(),
null,
mockedFormData,
null,
sinkConfig,
null,
null);
fail("Update without changes should fail");
} catch (RestException e) {
assertTrue(e.getMessage().contains("Update contains no change"));
}

try {
UpdateOptionsImpl updateOptions = new UpdateOptionsImpl();
updateOptions.setUpdateAuthData(false);
resource.updateSink(
sinkConfig.getTenant(),
sinkConfig.getNamespace(),
sinkConfig.getName(),
null,
mockedFormData,
null,
sinkConfig,
null,
updateOptions);
fail("Update without changes should fail");
} catch (RestException e) {
assertTrue(e.getMessage().contains("Update contains no change"));
}

// no changes but set the auth-update flag to true, should not fail
UpdateOptionsImpl updateOptions = new UpdateOptionsImpl();
updateOptions.setUpdateAuthData(true);
resource.updateSink(
sinkConfig.getTenant(),
sinkConfig.getNamespace(),
sinkConfig.getName(),
null,
mockedFormData,
null,
sinkConfig,
null,
updateOptions);
}
}
Loading