diff --git a/pulsar-functions/api-python/context.py b/pulsar-functions/api-python/context.py new file mode 100644 index 0000000000000..a8e2ea07f0e49 --- /dev/null +++ b/pulsar-functions/api-python/context.py @@ -0,0 +1,91 @@ +#!/usr/bin/env python +# -*- encoding: utf-8 -*- + +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. +# +"""context.py: Context defines context information available during +# processing of a request. +""" +from abc import abstractmethod + +class Context(object): + """Interface defining information available at process time""" + @abstractmethod + def get_message_id(self): + """Return the messageid of the current message that we are processing""" + pass + + @abstractmethod + def get_topic_name(self): + """Returns the topic name of the message that we are processing""" + pass + + @abstractmethod + def get_function_name(self): + """Returns the function name that we are a part of""" + pass + + @abstractmethod + def get_function_id(self): + """Returns the function id that we are a part of""" + pass + + @abstractmethod + def get_instance_id(self): + """Returns the instance id that is executing the function""" + pass + + @abstractmethod + def get_function_version(self): + """Returns the version of function that we are executing""" + pass + + @abstractmethod + def get_memory_limit(self): + """Returns the memory limit in bytes of function that we are executing""" + pass + + @abstractmethod + def get_time_budget_ms(self): + """Returns the time limit in milliseconds of function that we are executing""" + pass + + @abstractmethod + def get_remaining_time_ms(self): + """Returns the remaining time in milliseconds of function that we are executing""" + pass + + @abstractmethod + def get_logger(self): + """Returns the logger object that can be used to do logging""" + pass + + @abstractmethod + def get_user_config_value(self, key): + """Returns the value of the user defined config. If the key doesnt exist return null instead""" + pass + + @abstractmethod + def record_metric(self, metric_name, metric_value): + """Records the metric_value. metric_value has to satisfy isinstance(metric_value, numbers.Number)""" + pass + + @abstractmethod + def publish(self, topic_name, message, serde_class_name): + """Publishes message to topic_name by first serializing the message using serde_class_name serde""" + pass \ No newline at end of file diff --git a/pulsar-functions/api-python/pulsar_function.py b/pulsar-functions/api-python/pulsar_function.py new file mode 100644 index 0000000000000..a87c644f44df4 --- /dev/null +++ b/pulsar-functions/api-python/pulsar_function.py @@ -0,0 +1,33 @@ +#!/usr/bin/env python +# -*- encoding: utf-8 -*- + +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. +# +"""pulsar_function.py: This is the core interface of the function api. +# The process method is called for every message of the input topic of the +# function. The incoming input bytes are deserialized using the serde. +# The process function can optionally emit an output +""" +from abc import abstractmethod + +class PulsarFunction(object): + """Interface for Heron Metric""" + @abstractmethod + def process(self, input, context): + """Process input message""" + pass \ No newline at end of file diff --git a/pulsar-functions/api-python/serde.py b/pulsar-functions/api-python/serde.py new file mode 100644 index 0000000000000..43ff04d2f2ac2 --- /dev/null +++ b/pulsar-functions/api-python/serde.py @@ -0,0 +1,49 @@ +#!/usr/bin/env python +# -*- encoding: utf-8 -*- + +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. +# +"""serde.py: SerDe defines the interface for serialization/deserialization. +# Everytime a message is read from pulsar topic, the serde is invoked to +# serialize the bytes into an object before invoking the process method. +# Anytime a python object needs to be written back to pulsar, it is +# serialized into bytes before writing. +""" +from abc import abstractmethod + +import pickle + +class SerDe(object): + """Interface for Serialization/Deserialization""" + @abstractmethod + def serialize(self, input): + """Serialize input message into bytes""" + pass + + @abstractmethod + def deserialize(self, input_bytes): + """Serialize input_bytes into an object""" + pass + +class PickleSerDe(SerDe): + """Pickle based serializer""" + def serialize(self, input): + return pickle.dumps(input) + + def deserialize(self, input_bytes): + return pickle.loads(input_bytes) \ No newline at end of file