Skip to content

Commit

Permalink
Add a cli command to query function state (apache#200)
Browse files Browse the repository at this point in the history
  • Loading branch information
sijie committed Mar 4, 2018
1 parent 21212a1 commit d21341c
Show file tree
Hide file tree
Showing 2 changed files with 131 additions and 2 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,10 @@
*/
package org.apache.pulsar.admin.cli;

import static com.google.common.base.Preconditions.checkNotNull;
import static java.nio.charset.StandardCharsets.UTF_8;
import static org.apache.bookkeeper.common.concurrent.FutureUtils.result;

import com.beust.jcommander.Parameter;
import com.beust.jcommander.Parameters;
import com.beust.jcommander.converters.StringConverter;
Expand All @@ -27,10 +31,19 @@
import com.google.gson.JsonParser;
import com.google.gson.reflect.TypeToken;
import com.google.protobuf.util.JsonFormat;
import io.netty.buffer.ByteBuf;
import io.netty.buffer.ByteBufUtil;
import io.netty.buffer.Unpooled;
import java.net.MalformedURLException;
import lombok.Getter;
import lombok.extern.slf4j.Slf4j;
import net.jodah.typetools.TypeResolver;
import org.apache.distributedlog.api.StorageClient;
import org.apache.distributedlog.api.kv.Table;
import org.apache.distributedlog.api.kv.result.KeyValue;
import org.apache.distributedlog.clients.StorageClientBuilder;
import org.apache.distributedlog.clients.config.StorageClientSettings;
import org.apache.distributedlog.clients.utils.NetUtils;
import org.apache.pulsar.client.admin.PulsarAdmin;
import org.apache.pulsar.client.admin.PulsarFunctionsAdmin;
import org.apache.pulsar.common.naming.DestinationName;
Expand Down Expand Up @@ -61,6 +74,7 @@ public class CmdFunctions extends CmdBase {
private final GetFunction getter;
private final GetFunctionStatus statuser;
private final ListFunctions lister;
private final StateGetter stateGetter;

/**
* Base command
Expand Down Expand Up @@ -509,6 +523,64 @@ void runCmd() throws Exception {
}
}

@Parameters(commandDescription = "Query Function State")
class StateGetter extends FunctionCommand {

@Parameter(names = { "-k", "--key" }, description = "key")
private String key = null;

// TODO: this url should be fetched along with bookkeeper location from pulsar admin
@Parameter(names = { "-u", "--storage-service-url" }, description = "storage service url")
private String stateStorageServiceUrl = null;

@Parameter(names = { "-w", "--watch" }, description = "watch the value changes of a key")
private boolean watch = false;

@Override
void runCmd() throws Exception {
checkNotNull(stateStorageServiceUrl, "State storage service url is missing");

String tableNs = String.format(
"%s_%s",
tenant,
namespace);

String tableName = getFunctionName();

try (StorageClient client = StorageClientBuilder.newBuilder()
.withSettings(StorageClientSettings.newBuilder()
.addEndpoints(NetUtils.parseEndpoint(stateStorageServiceUrl))
.clientName("functions-admin")
.build())
.withNamespace(tableNs)
.build()) {
try (Table<ByteBuf, ByteBuf> table = result(client.openTable(tableName))) {
long lastVersion = -1L;
do {
try (KeyValue<ByteBuf, ByteBuf> kv = result(table.getKv(Unpooled.wrappedBuffer(key.getBytes(UTF_8))))) {
if (null == kv) {
System.out.println("key '" + key + "' doesn't exist.");
} else {
if (kv.version() > lastVersion) {
if (kv.isNumber()) {
System.out.println("value = " + kv.numberValue());
} else {
System.out.println("value = " + new String(ByteBufUtil.getBytes(kv.value()), UTF_8));
}
lastVersion = kv.version();
}
}
}
if (watch) {
Thread.sleep(1000);
}
} while (watch);
}
}

}
}

public CmdFunctions(PulsarAdmin admin) {
super("functions", admin);
this.fnAdmin = (PulsarFunctionsAdmin) admin;
Expand All @@ -519,13 +591,15 @@ public CmdFunctions(PulsarAdmin admin) {
getter = new GetFunction();
statuser = new GetFunctionStatus();
lister = new ListFunctions();
stateGetter = new StateGetter();
jcommander.addCommand("localrun", getLocalRunner());
jcommander.addCommand("create", getCreater());
jcommander.addCommand("delete", getDeleter());
jcommander.addCommand("update", getUpdater());
jcommander.addCommand("get", getGetter());
jcommander.addCommand("getstatus", getStatuser());
jcommander.addCommand("list", getLister());
jcommander.addCommand("querystate", getStateGetter());
}

@VisibleForTesting
Expand Down Expand Up @@ -560,4 +634,9 @@ GetFunction getGetter() {
ListFunctions getLister() {
return lister;
}

@VisibleForTesting
StateGetter getStateGetter() {
return stateGetter;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -18,20 +18,29 @@
*/
package org.apache.pulsar.admin.cli;

import static java.nio.charset.StandardCharsets.UTF_8;
import static org.mockito.Matchers.any;
import static org.mockito.Matchers.anyString;
import static org.mockito.Matchers.eq;
import static org.mockito.Mockito.doAnswer;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.times;
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.when;
import static org.powermock.api.mockito.PowerMockito.mockStatic;
import static org.testng.Assert.assertEquals;
import static org.testng.Assert.assertNull;
import static org.testng.Assert.assertTrue;

import io.netty.buffer.ByteBuf;
import io.netty.buffer.ByteBufUtil;
import java.io.File;
import java.net.URI;
import java.util.concurrent.atomic.AtomicReference;
import org.apache.bookkeeper.common.concurrent.FutureUtils;
import org.apache.distributedlog.api.StorageClient;
import org.apache.distributedlog.api.kv.Table;
import org.apache.distributedlog.clients.StorageClientBuilder;
import org.apache.distributedlog.clients.config.StorageClientSettings;
import java.util.HashMap;
import java.util.Map;

Expand All @@ -50,6 +59,7 @@
import org.apache.pulsar.functions.api.utils.DefaultSerDe;
import org.apache.pulsar.functions.proto.Function.FunctionConfig;
import org.apache.pulsar.functions.utils.Reflections;
import org.powermock.api.mockito.PowerMockito;
import org.powermock.core.classloader.annotations.PowerMockIgnore;
import org.powermock.core.classloader.annotations.PrepareForTest;
import org.testng.IObjectFactory;
Expand All @@ -60,7 +70,7 @@
/**
* Unit test of {@link CmdFunctions}.
*/
@PrepareForTest({ CmdFunctions.class, Reflections.class })
@PrepareForTest({ CmdFunctions.class, Reflections.class, StorageClientBuilder.class })
@PowerMockIgnore("javax.management.*")
public class CmdFunctionsTest {

Expand Down Expand Up @@ -344,4 +354,44 @@ public void testListFunctions() throws Exception {

verify(functions, times(1)).getFunctions(eq(tenant), eq(namespace));
}

@Test
public void testStateGetter() throws Exception {
String tenant = TEST_NAME + "_tenant";
String namespace = TEST_NAME + "_namespace";
String fnName = TEST_NAME + "_function";

mockStatic(StorageClientBuilder.class);

StorageClientBuilder builder = mock(StorageClientBuilder.class);
when(builder.withSettings(any(StorageClientSettings.class))).thenReturn(builder);
when(builder.withNamespace(eq(tenant + "_" + namespace))).thenReturn(builder);
StorageClient client = mock(StorageClient.class);
when(builder.build()).thenReturn(client);

PowerMockito.when(StorageClientBuilder.class, "newBuilder")
.thenReturn(builder);

Table<ByteBuf, ByteBuf> table = mock(Table.class);
when(client.openTable(eq(fnName))).thenReturn(FutureUtils.value(table));
AtomicReference<ByteBuf> keyHolder = new AtomicReference<>();
doAnswer(invocationOnMock -> {
ByteBuf buf = invocationOnMock.getArgumentAt(0, ByteBuf.class);
keyHolder.set(buf);
return FutureUtils.value(null);
}).when(table).getKv(any(ByteBuf.class));

cmd.run(new String[] {
"querystate",
"--tenant", tenant,
"--namespace", namespace,
"--name", fnName,
"--key", "test-key",
"--storage-service-url", "127.0.0.1:4181"
});

assertEquals(
"test-key",
new String(ByteBufUtil.getBytes(keyHolder.get()), UTF_8));
}
}

0 comments on commit d21341c

Please sign in to comment.