Skip to content

Commit

Permalink
Reconcile (apache#40)
Browse files Browse the repository at this point in the history
* Create pulsar-functions module (#1)

* Create pulsar-functions module

* rename `sdk` package to `api`

* Added the first cut of the Java interface for Pulsar functions (#2)

* Reconciled the interface between worker and client with Jerry

* Fixed the update method
  • Loading branch information
srkukarni authored and sijie committed Mar 4, 2018
1 parent 5c1e369 commit 4b4bea7
Show file tree
Hide file tree
Showing 3 changed files with 37 additions and 13 deletions.
3 changes: 2 additions & 1 deletion pulsar-functions/run-examples.sh
Original file line number Diff line number Diff line change
Expand Up @@ -19,8 +19,9 @@

bin/pulsar-functions functions localrun \
--function-config conf/example.yml \
--sink-topic persistent://sample/standalone/ns1/test_result \
--source-topic persistent://sample/standalone/ns1/test_src \
--sink-topic persistent://sample/standalone/ns1/test_result \
--input-serde-classname org.apache.pulsar.functions.runtime.serde.Utf8StringSerDe \
--output-serde-classname org.apache.pulsar.functions.runtime.serde.Utf8StringSerDe \
--function-classname org.apache.pulsar.functions.api.examples.ExclamationFunction \
--function-classpath `pwd`/api-examples/target/pulsar-functions-api-examples.jar
Original file line number Diff line number Diff line change
Expand Up @@ -108,7 +108,7 @@ public interface Functions {
* @throws PulsarAdminException
* Unexpected error
*/
void updateFunction(FunctionConfig functionConfig) throws PulsarAdminException;
void updateFunction(FunctionConfig functionConfig, byte[] code) throws PulsarAdminException;

/**
* Delete an existing function
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -61,21 +61,21 @@ public FunctionConfig getFunction(String tenant, String namespace, String functi
}

@Override
public void createFunction(FunctionConfig functionData, byte[] code) throws PulsarAdminException {
public void createFunction(FunctionConfig functionConfig, byte[] code) throws PulsarAdminException {
try {
final FormDataMultiPart mp = new FormDataMultiPart();
mp.bodyPart(new FormDataBodyPart("code", code, MediaType.APPLICATION_OCTET_STREAM_TYPE));
mp.bodyPart(new FormDataBodyPart("sourceTopic", functionData.getSourceTopic(),
mp.bodyPart(new FormDataBodyPart("data", code, MediaType.APPLICATION_OCTET_STREAM_TYPE));
mp.bodyPart(new FormDataBodyPart("sourceTopic", functionConfig.getSourceTopic(),
MediaType.APPLICATION_JSON_TYPE));
mp.bodyPart(new FormDataBodyPart("sinkTopic", functionData.getSinkTopic(),
mp.bodyPart(new FormDataBodyPart("sinkTopic", functionConfig.getSinkTopic(),
MediaType.APPLICATION_JSON_TYPE));
mp.bodyPart(new FormDataBodyPart("inputSerdeClassName", functionData.getInputSerdeClassName(),
mp.bodyPart(new FormDataBodyPart("inputSerdeClassName", functionConfig.getInputSerdeClassName(),
MediaType.APPLICATION_JSON_TYPE));
mp.bodyPart(new FormDataBodyPart("outputSerdeClassName", functionData.getOutputSerdeClassName(),
mp.bodyPart(new FormDataBodyPart("outputSerdeClassName", functionConfig.getOutputSerdeClassName(),
MediaType.APPLICATION_JSON_TYPE));
mp.bodyPart(new FormDataBodyPart("className", functionData.getClassName(),
mp.bodyPart(new FormDataBodyPart("className", functionConfig.getClassName(),
MediaType.APPLICATION_JSON_TYPE));
request(functions.path(functionData.getTenant()).path(functionData.getNameSpace()).path(functionData.getName()))
request(functions.path(functionConfig.getTenant()).path(functionConfig.getNameSpace()).path(functionConfig.getName()))
.put(Entity.entity(mp, MediaType.MULTIPART_FORM_DATA), ErrorData.class);
} catch (Exception e) {
throw getApiException(e);
Expand All @@ -93,11 +93,34 @@ public void deleteFunction(String cluster, String namespace, String function) th
}

@Override
public void updateFunction(FunctionConfig functionConfig)
throws PulsarAdminException {
public void updateFunction(FunctionConfig functionConfig, byte[] code) throws PulsarAdminException {
try {
final FormDataMultiPart mp = new FormDataMultiPart();
if (code != null) {
mp.bodyPart(new FormDataBodyPart("data", code, MediaType.APPLICATION_OCTET_STREAM_TYPE));
}
if (functionConfig.getSourceTopic() != null) {
mp.bodyPart(new FormDataBodyPart("sourceTopic", functionConfig.getSourceTopic(),
MediaType.APPLICATION_JSON_TYPE));
}
if (functionConfig.getSinkTopic() != null) {
mp.bodyPart(new FormDataBodyPart("sinkTopic", functionConfig.getSinkTopic(),
MediaType.APPLICATION_JSON_TYPE));
}
if (functionConfig.getInputSerdeClassName() != null) {
mp.bodyPart(new FormDataBodyPart("inputSerdeClassName", functionConfig.getInputSerdeClassName(),
MediaType.APPLICATION_JSON_TYPE));
}
if (functionConfig.getOutputSerdeClassName() != null) {
mp.bodyPart(new FormDataBodyPart("outputSerdeClassName", functionConfig.getOutputSerdeClassName(),
MediaType.APPLICATION_JSON_TYPE));
}
if (functionConfig.getClassName() != null) {
mp.bodyPart(new FormDataBodyPart("className", functionConfig.getClassName(),
MediaType.APPLICATION_JSON_TYPE));
}
request(functions.path(functionConfig.getTenant()).path(functionConfig.getNameSpace()).path(functionConfig.getName()))
.post(Entity.entity(functionConfig, MediaType.MULTIPART_FORM_DATA), ErrorData.class);
.post(Entity.entity(mp, MediaType.MULTIPART_FORM_DATA), ErrorData.class);
} catch (Exception e) {
throw getApiException(e);
}
Expand Down

0 comments on commit 4b4bea7

Please sign in to comment.