Skip to content
This repository has been archived by the owner on Dec 15, 2021. It is now read-only.

add NATS support #689

Merged
merged 11 commits into from
Apr 17, 2018
Merged

add NATS support #689

merged 11 commits into from
Apr 17, 2018

Conversation

murali-reddy
Copy link
Contributor

Issue Ref: #669

Description:

Add NATS support into Kubeless.

[PR Description]

This PR adds a first-cut integration of NATS into Kubeless. NATS provides wide variety of messaging options, this PR adds suport for https://nats.io/documentation/concepts/nats-queueing/

Please see the documentation included in the PR on how to use the functionality.

To the reviewer:

Commits are logically organised to make easier to follow.

8a38f8c6cb546cdbac8b3899f96341adc3e02fb9 (HEAD -> nats, origin/nats) NATS documentation
0fdb77e3f517c45fe5c74c978cbaa9fbb3bec9b7 fix tests
085e38f501126216c941ce3fb18194d0487444d7 build, CI and other misc stuff
0868bc84656ec1ff58955e5f3514609561232303 CLI support for NATS
90427b67f07810b9cc6df43c510bf11e3648c25d core nats trigger functionality
2254359450820910968bdd54646ffae72fa4f563 auto-generated code from code generator
b904a1d33d1fe325becfb8db380fee85ac6f607b API spec for nats trigger
2345b2a614c304adffd927aede296ad5caaf8e7a vendor nats-io/go-nats and nats-io/nuid

TODOs:

  • Ready to review
  • Automated Tests
  • Docs

Planning to add some more IT tests, but PR is good to get started with review.

if err != nil {
return err
}
if res.StatusCode != 200 {
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

should do something like defer resp.Body.Close() here to close response body and avoid leaking file descriptors ;)

func main() {
logrus.Infof("Running NATS controller version: %v", version.Version)
if err := rootCmd.Execute(); err != nil {
fmt.Println(err)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

minor thing but log.Fatalf would print the timestamp of the hypothetical crash

if oldNatsTriggerObj.ResourceVersion == newNatsTriggerObj.ResourceVersion {
return false
}
if !reflect.DeepEqual(oldNatsTriggerObj.Spec.FunctionSelector, newNatsTriggerObj.Spec.FunctionSelector) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

you can use apiequality to compare the whole Spec object (see the function_controller)

return req, nil
}

func sendMessage(req *http.Request) error {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

we should move isJSON getHTTPReq and this sendMessage to a common package (since it is reused for Kafka)

req.Header.Add("event-time", timestamp.String())
req.Header.Add("event-namespace", "natstriggers.kubeless.io")
if isJSON(body) {
logrus.Infof("TRUE %v", body)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this is a debug message

req.Header.Add("Content-Type", "application/json")
req.Header.Add("event-type", "application/json")
} else {
logrus.Infof("FLASE %v", body)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

debug message

},
}

func publishTopic(topic, message, url string) error {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nice solution :)

make nats-controller-image NATS_CONTROLLER_IMAGE=$NATS_CONTROLLER_IMAGE
./script/integration-tests minikube deployment
./script/integration-tests minikube nats
;;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

the image should be done in the GKE scenario as well since it is the scenario that pushes the images and test everything

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

also retag the image with the TRAVIS_TAG in the after_success hook, rename the file in the before_deploy and finally add the file in the deploy.file list

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done.

@@ -75,6 +75,9 @@ basic)
kafka)
bats tests/integration-tests-kafka.bats
;;
nats)
bats tests/integration-tests-nats.bats
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

add this tests as well in the * case (for GKE)

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I will do it in a seperate PR. Intergration tests for NATS are running as part of Minikube which should be sufficient for now.

@@ -0,0 +1,44 @@
#!/usr/bin/env bash
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

you should be able to reuse the binary-controller script instead of adding a new one

Copy link
Contributor

@andresmgot andresmgot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Unrelated but we have release notes for Kafka in script/upload_release_notes. I think we should modify them to just point to this tutorial for Kafka and NATS

@@ -133,7 +133,11 @@ Kubeless also supports [ingress](https://kubernetes.io/docs/concepts/services-ne

## PubSub function

We provide several [PubSub runtimes](https://hub.docker.com/r/kubeless/), which has suffix `event-consumer`, which help you to quickly deploy your function with PubSub mechanism. The PubSub function will expect to consume input messages from a predefined Kafka topic which means Kafka is required. In Kubeless [release page](https://github.com/kubeless/kubeless/releases), you can find the manifest to quickly deploy a collection of Kafka and Zookeeper statefulsets. If you have a Kafka cluster already running in the same Kubernetes environment, you can also deploy PubSub function with it. Check out [this tutorial](/docs/use-existing-kafka) for more details how to do that.
We provide several [PubSub runtimes](https://hub.docker.com/r/kubeless/), which has suffix `event-consumer`, which help you to quickly deploy your function with PubSub mechanism. The PubSub function will expect to consume input messages from a predefined topic from a messaging system. Kubeless supports using events from Kafka and NATS messaging systems as Triggers.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We provide several PubSub runtimes, which has suffix event-consumer, which help you to quickly deploy your function with PubSub mechanism

that is no longer correct. I would say something like "all the available runtimes can be triggered with a PubSub mechanism..."


Above command will create NATS cluster IP service `nats.nats-io.svc.cluster.local:4222` which is the default URL Kubeless NATS trigger contoller expects.

At this point you are all set try Kubeless NATS triggers.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I would continue the tutorial a little bit more and deploy a function using a NATS topic

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Expanded the quick start guide for NATS.

@andresmgot
Copy link
Contributor

We should also add an additional flag to the kubeless function deploy to complement the --trigger-topic flag and knows if the target is Kafka or NATS. Or just remove the --trigger-topic flag to clarify that PubSub triggers have a different life cycle and should be created/deleted independently. I would do the second since right know if you create and delete a function with --trigger-topic an orphaned KafkaTrigger is left.

var (
stopM map[string](chan struct{})
stoppedM map[string](chan struct{})
consumerM map[string]bool
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Just wondering but is this map ok in terms of concurrent access? It seems would be used when either syncNATSTrigger and FunctionAddedDeletedUpdated are called but not sure if they would both happen concurrently.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Added locking there is a remote chance of race condition.

@murali-reddy
Copy link
Contributor Author

@andresmgot I have removed --trigger-topic flag in the kubeless function deploy call. Updated quick start guide for Kafka as well to reflect this.

Copy link
Contributor

@andresmgot andresmgot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nice! Changes LGTM. Great work :)

@murali-reddy murali-reddy merged commit 3dcd671 into master Apr 17, 2018
@murali-reddy
Copy link
Contributor Author

thanks for the review @andresmgot @wallyqs

@wallyqs you should be able to use the functionality by building image from the master. Ping me on slack if you run into any issue. Would be nice to get your feedback and possible future enhacements.

This was referenced Apr 17, 2018
@andresmgot andresmgot deleted the nats branch October 31, 2018 10:11
Sign up for free to subscribe to this conversation on GitHub. Already have an account? Sign in.
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

3 participants