Skip to content

Commit

Permalink
Added python api (apache#132)
Browse files Browse the repository at this point in the history
  • Loading branch information
srkukarni authored and sijie committed Mar 4, 2018
1 parent 624afe7 commit 8f571de
Show file tree
Hide file tree
Showing 3 changed files with 173 additions and 0 deletions.
91 changes: 91 additions & 0 deletions pulsar-functions/api-python/context.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,91 @@
#!/usr/bin/env python
# -*- encoding: utf-8 -*-

# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.
#
"""context.py: Context defines context information available during
# processing of a request.
"""
from abc import abstractmethod

class Context(object):
"""Interface defining information available at process time"""
@abstractmethod
def get_message_id(self):
"""Return the messageid of the current message that we are processing"""
pass

@abstractmethod
def get_topic_name(self):
"""Returns the topic name of the message that we are processing"""
pass

@abstractmethod
def get_function_name(self):
"""Returns the function name that we are a part of"""
pass

@abstractmethod
def get_function_id(self):
"""Returns the function id that we are a part of"""
pass

@abstractmethod
def get_instance_id(self):
"""Returns the instance id that is executing the function"""
pass

@abstractmethod
def get_function_version(self):
"""Returns the version of function that we are executing"""
pass

@abstractmethod
def get_memory_limit(self):
"""Returns the memory limit in bytes of function that we are executing"""
pass

@abstractmethod
def get_time_budget_ms(self):
"""Returns the time limit in milliseconds of function that we are executing"""
pass

@abstractmethod
def get_remaining_time_ms(self):
"""Returns the remaining time in milliseconds of function that we are executing"""
pass

@abstractmethod
def get_logger(self):
"""Returns the logger object that can be used to do logging"""
pass

@abstractmethod
def get_user_config_value(self, key):
"""Returns the value of the user defined config. If the key doesnt exist return null instead"""
pass

@abstractmethod
def record_metric(self, metric_name, metric_value):
"""Records the metric_value. metric_value has to satisfy isinstance(metric_value, numbers.Number)"""
pass

@abstractmethod
def publish(self, topic_name, message, serde_class_name):
"""Publishes message to topic_name by first serializing the message using serde_class_name serde"""
pass
33 changes: 33 additions & 0 deletions pulsar-functions/api-python/pulsar_function.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,33 @@
#!/usr/bin/env python
# -*- encoding: utf-8 -*-

# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.
#
"""pulsar_function.py: This is the core interface of the function api.
# The process method is called for every message of the input topic of the
# function. The incoming input bytes are deserialized using the serde.
# The process function can optionally emit an output
"""
from abc import abstractmethod

class PulsarFunction(object):
"""Interface for Heron Metric"""
@abstractmethod
def process(self, input, context):
"""Process input message"""
pass
49 changes: 49 additions & 0 deletions pulsar-functions/api-python/serde.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,49 @@
#!/usr/bin/env python
# -*- encoding: utf-8 -*-

# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.
#
"""serde.py: SerDe defines the interface for serialization/deserialization.
# Everytime a message is read from pulsar topic, the serde is invoked to
# serialize the bytes into an object before invoking the process method.
# Anytime a python object needs to be written back to pulsar, it is
# serialized into bytes before writing.
"""
from abc import abstractmethod

import pickle

class SerDe(object):
"""Interface for Serialization/Deserialization"""
@abstractmethod
def serialize(self, input):
"""Serialize input message into bytes"""
pass

@abstractmethod
def deserialize(self, input_bytes):
"""Serialize input_bytes into an object"""
pass

class PickleSerDe(SerDe):
"""Pickle based serializer"""
def serialize(self, input):
return pickle.dumps(input)

def deserialize(self, input_bytes):
return pickle.loads(input_bytes)

0 comments on commit 8f571de

Please sign in to comment.