-
Notifications
You must be signed in to change notification settings - Fork 3.6k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
PIP-15: Pulsar Functions #1314
PIP-15: Pulsar Functions #1314
Conversation
it is probably hard for review. we have tried to make the changes happen under
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Impossible to do thorough review for 25K+ LoC, but it only adds 60s to test times, and it looks good from a quick scan, so I say +1.
(full disclosure: I work for streamlio)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
👍
Just left few comments that can be addressed later.
@@ -0,0 +1,109 @@ | |||
<!-- |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Is this pom still needed or should we just rollover into parent dist module?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
yes. we should rollover into the parent dist module. probably better to do after merging this, to avoid touch files outside this functions module.
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"> | ||
<modelVersion>4.0.0</modelVersion> | ||
|
||
<properties> |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'd prefer to set all package version (and variables) in top-level pom so that we have 1 single place to check/update, unless there's a specific requirement.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
yes. will do after merging
<version>3.5.1</version> | ||
</dependency> | ||
|
||
<dependency> |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Same for this versions, they should go to top-level pom. For protobuf, we could have 2 variables protobuf2.version
and protobuf3.version
.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
yes. will do after mering.
@@ -0,0 +1,111 @@ | |||
#!/usr/bin/env python | |||
# | |||
# Licensed to the Apache Software Foundation (ASF) under one |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
ASF headers looks repeated twice in some Python files
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nice catch. will try to fix it or do it in a separate PR
|
||
public class CounterFunction implements PulsarFunction<String, Void> { | ||
@Override | ||
public Void process(String input, Context context) throws Exception { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Would it make sense to add a specialization of PulsarFunction
interface for a void function?
In that way, we won't have the Void
return type with the meaningless return null
.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
That is a good question. I think it falls into a question, do we support java.util.Consumer, which is a void native function.
/cc @srkukarni
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes java.util.Consumer
is fine and probably easy to support (without SDK), though it would need a similar void correspective of PulsarFunction
, like PulsarConsumer
... which might get confusing.. :)
|
||
from pulsarfunction import pulsar_function | ||
|
||
class Exclamation(pulsar_function.PulsarFunction): |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think the package here looks a bit redundant.
This could be shortened into:
from pulsar import Function
class Exclamation(Function):
def process(self, input, context):
return input + '!'
or
impor pulsar
class Exclamation(pulsar.Function):
def process(self, input, context):
return input + '!'
We just need to figure out if there will be a clash with the pulsar-client
Python library.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
/cc @srkukarni
* specific language governing permissions and limitations | ||
* under the License. | ||
*/ | ||
package org.apache.pulsar.functions.worker.rest.api.v1; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Since we're already moving REST api to v2 (and changing major version or project to v2), I think we should consider to use v2
for functions as well, to avoid confusion.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
a good point. will fix it.
Ok, since there were no objection, we're going to merge this PR. |
* Relocate instance and spawner to be under `runtime` package * fix the pom files and travis build
…JavaInstance (apache#30) ### Motivation SerDe is user defined class which used for serializing and deserializing objects for functions. So the initialization of a SerDe class should happen along with Function class and done by the context class loader. Also SerDe should be a generic-typed interface and the type should be consistent with function type. ### Modifications - make SerDe a generic-typed interface - move SerDe initialization to JavaInstance and use the thread context class loader - verify SerDe type - move reflection related code to a util class `Reflections` ### Result - SerDe is a generic-typed interface - JavaInstance initializes both function and serdes and verifies the types.
### Motivation input type can be different output type. so we need two serdes for each function. ### Modifications - add inputSerde and outputSerde - verify both input and output serde to make sure their types are consistent with function types ### Result user run functions with input/output serdes.
- the package will be generated under `dist/target/`
…#262) * Support parallelism parameter while running functions locally * Fixed the generation of java and python files
* Move the thread construction to start to avoid illegalstate * Add more checks to see if thread var is init
* Use `distributedlog-core-shaded` in pulsar worker * revert to db ledger storage * Include netty-all * Fix serviceUrl for functions cli
- downloader is writing the package to a temp location - after a package is downloaded, a hardlink is created to link to the temp file. hardlink will ensure one createLink succeed. - after the hardlink is created, the temp files can be deleted. After this change, it allows concurrent downloading without interleaving with each other.
…apache#269) Example: ``` (CONNECTING) /> (CONNECTED) /> ls pulsar/functions/test test-namespace (CONNECTED) /> ls pulsar/functions/test/test-namespace function11 function12 function13 (CONNECTED) /> ls pulsar/functions/test/test-namespace/function11 8e4dcc81-3b00-4bf9-a489-1823b5514788-pulsar-functions-api-examples.jar (CONNECTED) /> ls pulsar/functions/test/test-namespace/function12 0ee8e267-74cb-4178-99ea-6c723298223c-pulsar-functions-api-examples.jar (CONNECTED) /> ls pulsar/functions/test/test-namespace/function13 8f1dea11-0070-434b-ba97-a42b44d8063c-pulsar-functions-api-examples.jar (CONNECTED) /> ```
The `compile` scope in dependency plugin means : compile, provided, and system dependencies
apache#273) * Create ProcessBuilder at during start() so that start() can be called multiple times from Spawner * Fix unittest
- Remove function-composition
…cated license headers This addresses some comments in pulsar functions PR apache#1314
* Move pulsar functions dependency version to root pom and remove duplicated license headers This addresses some comments in pulsar functions PR #1314 * shade worker * Fix broken master * Upgrade the bookkeeper storage client dependency to the official bookkeeper version This removes the temp dependency in `pulsar-functions-instance` * set `protobuf2.version` in pulsar-common * provide a shaded worker * include worker dependency at broker * Embeded function worker at broker * rename 'function worker' to 'functions worker' * add "--no-functions-worker" for pulsar-client-cpp tests
* Move pulsar functions dependency version to root pom and remove duplicated license headers This addresses some comments in pulsar functions PR #1314 * shade worker * Fix broken master * Upgrade the bookkeeper storage client dependency to the official bookkeeper version This removes the temp dependency in `pulsar-functions-instance` * set `protobuf2.version` in pulsar-common * provide a shaded worker * include worker dependency at broker * Embeded function worker at broker * rename 'function worker' to 'functions worker' * add "--no-functions-worker" for pulsar-client-cpp tests * Integrate function cli into pulsar-admin cli - rename `pulsar-client-tools-shaded` to `pulsar-client-admin-shaded-for-functions`, because this module is used by functions only to avoid protobuf conflicts - move protobuf3 references to Utils, so it won't be referenced out side of pulsar-functions - integrate function cli into pulsar-admin cli * Fix license header issues * Fixed ZK cache test exectutor configuration. Fixes #1338
…1332) * Move pulsar functions dependency version to root pom and remove duplicated license headers This addresses some comments in pulsar functions PR #1314 * shade worker * Fix broken master * Upgrade the bookkeeper storage client dependency to the official bookkeeper version This removes the temp dependency in `pulsar-functions-instance` * set `protobuf2.version` in pulsar-common * provide a shaded worker * include worker dependency at broker * Embeded function worker at broker * rename 'function worker' to 'functions worker' * add "--no-functions-worker" for pulsar-client-cpp tests * Integrate function cli into pulsar-admin cli - rename `pulsar-client-tools-shaded` to `pulsar-client-admin-shaded-for-functions`, because this module is used by functions only to avoid protobuf conflicts - move protobuf3 references to Utils, so it won't be referenced out side of pulsar-functions - integrate function cli into pulsar-admin cli * Merge pulsar-functions dist package into pulsar binary distribution * Fix license header issues * Fixed ZK cache test exectutor configuration. Fixes #1338
This is the PR for PIP-15: https://github.com/apache/incubator-pulsar/wiki/PIP-15:-Pulsar-Functions
The branch for review: https://github.com/sijie/incubator-pulsar/tree/pulsar_functions
All code changes are made in a separated module under directory
pulsar-functions
: https://github.com/sijie/incubator-pulsar/tree/pulsar_functions/pulsar-functionsAuthors: @srkukarni @jerrypeng @sijie