Skip to content

Commit

Permalink
Added publish interface to the api (apache#123)
Browse files Browse the repository at this point in the history
* Added publish interface to the api

* Address comments
  • Loading branch information
srkukarni authored and sijie committed Mar 4, 2018
1 parent 5dfbb27 commit b3a8b21
Show file tree
Hide file tree
Showing 8 changed files with 137 additions and 12 deletions.
Original file line number Diff line number Diff line change
@@ -0,0 +1,31 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.pulsar.functions.api.examples;

import org.apache.pulsar.functions.api.Context;
import org.apache.pulsar.functions.api.RequestHandler;
import org.apache.pulsar.functions.api.utils.Utf8StringSerDe;

public class PublishFunction implements RequestHandler<String, Void> {
@Override
public Void handleRequest(String input, Context context) {
context.publish(context.getUserConfigValue("PublishTopic"), input + "!", Utf8StringSerDe.class);
return null;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,8 @@

import org.slf4j.Logger;

import java.util.concurrent.CompletableFuture;

/**
* Context provides contextual information to the executing function.
* Features like which message id we are handling, whats the topic name of the
Expand Down Expand Up @@ -103,4 +105,13 @@ public interface Context {
* @param value The value of the metric
*/
void recordMetric(String metricName, double value);

/**
* Publish an object using serDe for serializing to the topic
* @param topicName The name of the topic for publishing
* @param object The object that needs to be published
* @param serDeClass The class that needs to be used to serialize the object before publishing
* @return
*/
CompletableFuture<Void> publish(String topicName, Object object, Class<? extends SerDe> serDeClass);
}
4 changes: 2 additions & 2 deletions pulsar-functions/conf/example.yml
Original file line number Diff line number Diff line change
Expand Up @@ -23,5 +23,5 @@ name: "example"
className: "org.apache.pulsar.functions.api.examples.ExclamationFunction"
inputs:
"persistent://sample/standalone/ns1/test_src" : "org.apache.pulsar.functions.runtime.serde.Utf8StringSerDe"
sinkTopic: "persistent://sample/standalone/ns1/test_result"
outputSerdeClassName: "org.apache.pulsar.functions.runtime.serde.Utf8StringSerDe"
userConfig:
"PublishTopic" : "persistent://sample/standalone/ns1/test_result"
25 changes: 25 additions & 0 deletions pulsar-functions/run-publish-example.sh
Original file line number Diff line number Diff line change
@@ -0,0 +1,25 @@
#
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.
#

bin/pulsar-functions functions localrun \
--function-config conf/example.yml \
--source-topics persistent://sample/standalone/ns1/test_src \
--input-serde-classnames org.apache.pulsar.functions.api.utils.Utf8StringSerDe \
--function-classname org.apache.pulsar.functions.api.examples.LoggingFunction \
--jar `pwd`/api-examples/target/pulsar-functions-api-examples.jar
Original file line number Diff line number Diff line change
Expand Up @@ -20,12 +20,22 @@

import lombok.Getter;
import lombok.Setter;
import org.apache.pulsar.client.api.Producer;
import org.apache.pulsar.client.api.ProducerConfiguration;
import org.apache.pulsar.client.api.PulsarClient;
import org.apache.pulsar.client.api.PulsarClientException;
import org.apache.pulsar.functions.api.Context;
import org.apache.pulsar.functions.api.SerDe;
import org.apache.pulsar.functions.proto.InstanceCommunication.MetricsData;
import org.apache.pulsar.functions.utils.Reflections;
import org.slf4j.Logger;

import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.TimeUnit;

/**
* This class implements the Context interface exposed to the user.
Expand Down Expand Up @@ -66,10 +76,26 @@ public void update(double value) {

private ConcurrentMap<String, AccumulatedMetricDatum> accumulatedMetrics;

public ContextImpl(JavaInstanceConfig config, Logger logger) {
private Map<String, Producer> publishProducers;
private Map<Class<? extends SerDe>, SerDe> publishSerializers;
private ProducerConfiguration producerConfiguration;
private PulsarClient pulsarClient;
private ClassLoader classLoader;

public ContextImpl(JavaInstanceConfig config, Logger logger, PulsarClient client,
ClassLoader classLoader) {
this.config = config;
this.logger = logger;
this.pulsarClient = client;
this.classLoader = classLoader;
this.accumulatedMetrics = new ConcurrentHashMap<>();
this.publishProducers = new HashMap<>();
this.publishSerializers = new HashMap<>();
producerConfiguration = new ProducerConfiguration();
producerConfiguration.setBlockIfQueueFull(true);
producerConfiguration.setBatchingEnabled(true);
producerConfiguration.setBatchingMaxPublishDelay(1, TimeUnit.MILLISECONDS);
producerConfiguration.setMaxPendingMessages(1000000);
}

public void setCurrentMessageContext(String messageId, String topicName) {
Expand Down Expand Up @@ -137,6 +163,30 @@ public String getUserConfigValue(String key) {
}
}

@Override
public CompletableFuture<Void> publish(String topicName, Object object, Class<? extends SerDe> serDeClass) {
if (!publishProducers.containsKey(topicName)) {
try {
publishProducers.put(topicName, pulsarClient.createProducer(topicName, producerConfiguration));
} catch (PulsarClientException ex) {
CompletableFuture<Void> retval = new CompletableFuture<>();
retval.completeExceptionally(ex);
return retval;
}
}

if (!publishSerializers.containsKey(serDeClass)) {
publishSerializers.put(serDeClass, Reflections.createInstance(
serDeClass.getName(),
serDeClass,
classLoader));
}

byte[] bytes = publishSerializers.get(serDeClass).serialize(object);
return publishProducers.get(topicName).sendAsync(bytes)
.thenApply(msgId -> null);
}

@Override
public void recordMetric(String metricName, double value) {
accumulatedMetrics.putIfAbsent(metricName, new AccumulatedMetricDatum());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@

import lombok.extern.slf4j.Slf4j;
import net.jodah.typetools.TypeResolver;
import org.apache.pulsar.client.api.PulsarClient;
import org.apache.pulsar.functions.api.RawRequestHandler;
import org.apache.pulsar.functions.api.RequestHandler;
import org.apache.pulsar.functions.proto.InstanceCommunication;
Expand Down Expand Up @@ -70,19 +71,23 @@ public class JavaInstance implements AutoCloseable {
private ExecutorService executorService;

public JavaInstance(JavaInstanceConfig config, ClassLoader clsLoader,
PulsarClient pulsarClient,
List<SerDe> inputSerDe, SerDe outputSerDe) {
this(
config,
Reflections.createInstance(
config.getFunctionConfig().getClassName(),
clsLoader), inputSerDe, outputSerDe);
clsLoader), clsLoader, pulsarClient, inputSerDe, outputSerDe);
}

JavaInstance(JavaInstanceConfig config, Object object, List<SerDe> inputSerDe, SerDe outputSerDe) {
JavaInstance(JavaInstanceConfig config, Object object,
ClassLoader clsLoader,
PulsarClient pulsarClient,
List<SerDe> inputSerDe, SerDe outputSerDe) {
// TODO: cache logger instances by functions?
Logger instanceLog = LoggerFactory.getLogger("function-" + config.getFunctionConfig().getName());

this.context = new ContextImpl(config, instanceLog);
this.context = new ContextImpl(config, instanceLog, pulsarClient, clsLoader);

// create the functions
if (object instanceof RequestHandler) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -118,7 +118,7 @@ public void run() {
javaInstanceConfig.getFunctionConfig().getInputsMap().forEach((k, v) -> this.inputSerDe.put(k, initializeSerDe(v, clsLoader)));
this.outputSerDe = initializeSerDe(javaInstanceConfig.getFunctionConfig().getOutputSerdeClassName(), clsLoader);

javaInstance = new JavaInstance(javaInstanceConfig, clsLoader, new ArrayList(inputSerDe.values()), outputSerDe);
javaInstance = new JavaInstance(javaInstanceConfig, clsLoader, client, new ArrayList(inputSerDe.values()), outputSerDe);

while (true) {
JavaExecutionResult result;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -103,7 +103,7 @@ public void testLongRunningFunction() throws Exception {
JavaInstanceConfig config = createInstanceConfig();
config.getLimitsConfig().setMaxTimeMs(2000);
JavaInstance instance = new JavaInstance(
config, new LongRunningHandler(), Arrays.asList(Utf8StringSerDe.of()), Utf8StringSerDe.of());
config, new LongRunningHandler(), null, null, Arrays.asList(Utf8StringSerDe.of()), Utf8StringSerDe.of());
String testString = "ABC123";
JavaExecutionResult result = instance.handleMessage("1", "random", serialize(testString),
Utf8StringSerDe.of());
Expand All @@ -123,7 +123,8 @@ public void testLambda() {
JavaInstance instance = new JavaInstance(
config,
(RequestHandler<String, String>) (input, context) -> input + "-lambda",
Arrays.asList(Utf8StringSerDe.of()), Utf8StringSerDe.of());
null, null,
Arrays.asList(Utf8StringSerDe.of()), Utf8StringSerDe.of());
String testString = "ABC123";
JavaExecutionResult result = instance.handleMessage("1", "random", serialize(testString),
Utf8StringSerDe.of());
Expand All @@ -140,7 +141,7 @@ public void testUnsupportedClasses() {
JavaInstanceConfig config = createInstanceConfig();
try {
new JavaInstance(
config, new UnsupportedHandler(), Arrays.asList(Utf8StringSerDe.of()), Utf8StringSerDe.of());
config, new UnsupportedHandler(), null, null, Arrays.asList(Utf8StringSerDe.of()), Utf8StringSerDe.of());
assertFalse(true);
} catch (RuntimeException ex) {
// Good
Expand All @@ -157,7 +158,7 @@ public void testVoidInputClasses() {
JavaInstanceConfig config = createInstanceConfig();
try {
new JavaInstance(
config, new VoidInputHandler(), Arrays.asList(Utf8StringSerDe.of()), null);
config, new VoidInputHandler(), null, null, Arrays.asList(Utf8StringSerDe.of()), null);
assertFalse(true);
} catch (RuntimeException ex) {
// Good
Expand All @@ -174,7 +175,7 @@ public void testVoidOutputClasses() {
JavaInstanceConfig config = createInstanceConfig();
config.getLimitsConfig().setMaxTimeMs(2000);
JavaInstance instance = new JavaInstance(
config, new VoidOutputHandler(), Arrays.asList(Utf8StringSerDe.of()), Utf8StringSerDe.of());
config, new VoidOutputHandler(), null, null, Arrays.asList(Utf8StringSerDe.of()), Utf8StringSerDe.of());
String testString = "ABC123";
JavaExecutionResult result = instance.handleMessage("1", "r", serialize(testString),
Utf8StringSerDe.of());
Expand All @@ -197,6 +198,7 @@ public void testInconsistentInputType() {
new JavaInstance(
config,
(RequestHandler<String, String>) (input, context) -> input + "-lambda",
null, null,
Arrays.asList(JavaSerDe.of()), Utf8StringSerDe.of());
fail("Should fail constructing java instance if function type is inconsistent with serde type");
} catch (RuntimeException re) {
Expand All @@ -218,6 +220,7 @@ public void testInconsistentOutputType() {
new JavaInstance(
config,
(RequestHandler<String, String>) (input, context) -> input + "-lambda",
null, null,
Arrays.asList(Utf8StringSerDe.of()), JavaSerDe.of());
fail("Should fail constructing java instance if function type is inconsistent with serde type");
} catch (RuntimeException re) {
Expand Down

0 comments on commit b3a8b21

Please sign in to comment.