-
Notifications
You must be signed in to change notification settings - Fork 753
decoupling Function, Trigger and Runtimes #620
Conversation
@nikhita it has been a while, but you might be interested by this PR and we would love to get an external review :) we know, it is big... |
corresponding auto-generated informer, listers, clienset and deep copy generated function
triggers CRD objects.
"kubeless function deploy" and move ingress object creation logic to http trigger controller
Co-authored-by: Andres <andres.mgotor@gmail.com> Co-authored-by: Tuna <ng.tuna@gmail.com>
Co-authored-by: Andres <andres.mgotor@gmail.com> Co-authored-by: Tuna <ng.tuna@gmail.com>
Co-authored-by: Tuna <ng.tuna@gmail.com>
Co-authored-by: Tuna <ng.tuna@gmail.com>
Co-authored-by: Tuna <ng.tuna@gmail.com> Co-authored-by: Andres <andres.mgotor@gmail.com>
bea0ddf
to
72afaab
Compare
1f51065
to
99222b4
Compare
99222b4
to
343e5d5
Compare
cmd/kubeless/function/deploy.go
Outdated
@@ -138,11 +140,6 @@ var deployCmd = &cobra.Command{ | |||
logrus.Fatal(err) | |||
} | |||
|
|||
cpu, err := cmd.Flags().GetString("cpu") |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
where has this gone?
Namespace: ns, | ||
} | ||
cronJobTrigger.ObjectMeta.Labels = map[string]string{ | ||
"created-by": "kubeless", |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
we should probably add labels here and in the http trigger for function=funcName
?
} | ||
kafkaTrigger.ObjectMeta.Labels = map[string]string{ | ||
"created-by": "kubeless", | ||
"function": funcName, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
does this mean that we have a relation 1:1 for trigger objects functions? That is fine for me now, I just want to clarify if that is the intention
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think it means a N:1 trigger:functions (ie: multiple triggers can point to the same function)... Which we have anyway, given that we have a functionSelector below that (presumably) only matches the multiple instances/versions of a single Function. (or at least, I can't think of a useful example where the functionSelector could be used to sprays requests across a wide range of Functions).
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
sorry for the confusion, above label setting pointed by @andresmgot has no meaning. I will remove it. Below line is what matters.
kafkaTrigger.Spec.FunctionSelector.MatchLabels = f.ObjectMeta.Labels
This is to keep status-quo of below user experiance of kubeless CLI with what it is today. Lables used for the function are the once specified for FunctionSelector
kubeless function deploy pubsub-nodejs --trigger-topic s3-nodejs --runtime nodejs6 --handler pubsub-nodejs.handler --from-file nodejs/hellowithdata.js
Through the Kafka trigger CRD object/API, you can still express the N:1 releationship between functions:kafka triggers. Function could be part of multiple Kafka triggers as well.
} | ||
} | ||
|
||
httpTrigger.Spec.RouteName = routeName |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
this line is not necessary, isn't it? shouldn't we erase the route name value?
docker/controller-manager/Dockerfile
Outdated
@@ -0,0 +1,7 @@ | |||
FROM bitnami/minideb:jessie | |||
|
|||
LABEL maintainer="Nguyen Anh Tu <tuna@bitnami.com>" |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
we need to set as maintainers the kubernetes group, I am not sure the exact maintainer we need to use but we should check this
kubeless.jsonnet
Outdated
"httpImage": "kubeless/python@sha256:0f3b64b654df5326198e481cd26e73ecccd905aae60810fc9baea4dcbb61f697", | ||
"pubsubImage": "kubeless/python-event-consumer@sha256:1aeb6cef151222201abed6406694081db26fa2235d7ac128113dcebd8d73a6cb", | ||
"initImage": "tuna/python-pillow:2.7.11-alpine" | ||
"runtimeImage": "andresmgot/python@sha256:cdba8db2ab40c2e1b954f133c4b3c2dc8f63044e490bcccee88621a61d2d06d5", |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I need to move these images to the kubeless repo if they are going to be the final status
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I have not see any issues so far. should be good to move to kubeless repo.
// HTTPTriggerSpec contains func specification | ||
type HTTPTriggerSpec struct { | ||
FunctionName string `json:"function-name"` // Name of the associated function | ||
ServiceSpec v1.ServiceSpec `json:"service"` |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
maybe out of scope for this PR but should we take the oportunity to change this for a Service
instead of ServiceSpec
to be able to set annotations?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Whereas I feel the opposite ;) why are we exposing the Service at all? I think an HTTP trigger should talk about HTTP host/port/path route that will be created (and optional TLS details) .. and that's about all.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes but some people need the flexibility to define some annotations in order to use some external services (slack link). Also note that this would be for advanced use only, the normal usage would be using the kubeless CLI with the flags --port
--hostname
...
} | ||
|
||
for _, function := range functions { | ||
if err != nil { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
this err != nil
seems misplaced
} | ||
|
||
for _, function := range functions { | ||
if err != nil { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
the same about this err
} | ||
case err, more := <-consumer.Errors(): | ||
if more { | ||
logrus.Fatalf("Error: %s\n", err.Error()) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
does this cause a crash in the consumer (and probably the shared pod) if there is an error with a message?
8201cf8
to
2d78977
Compare
|
||
timestamp := time.Now().UTC() | ||
req.SetHeader("event-id", fmt.Sprintf("kafka-consumer-%s-%s-%s", funcName, ns, timestamp.Format(time.RFC3339Nano))) | ||
req.SetHeader("event-type", "application/json") |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
we cannot assume that the message is a json
cmd/kubeless/function/call.go
Outdated
@@ -19,18 +19,28 @@ package function | |||
import ( | |||
"bytes" | |||
"fmt" | |||
"math/rand" |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
UnixNano()
only gives us a few bytes of entropy between competing controllers that start at similar times, which will lead to more frequent event id collisions than we otherwise expect. I suggest using crypto/rand
instead, to draw on an a better entropy source.
cmd/kubeless/function/call.go
Outdated
func getRandString(n int) string { | ||
b := make([]byte, n) | ||
for i := range b { | ||
b[i] = letterBytes[rand.Intn(len(letterBytes))] |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Is it important that event-ids are only from this quite-restricted base36 character set?
(I expected to see eg standard or url-safe base64 here)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
not really, is there an easy (short) way of doing that? There may be a way I am unaware of.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
import (
"encoding/base64"
"crypto/rand"
)
func getRandString(n int) (string, err) {
b := make([]byte, n)
if _, err := rand.Read(b); err != nil {
return "", err
}
return base64.RawURLEncoding.EncodeToString(b), nil
}
cmd/kubeless/function/call.go
Outdated
@@ -86,7 +96,7 @@ var callCmd = &cobra.Command{ | |||
req = req.AbsPath(svc.ObjectMeta.SelfLink + ":" + port + "/proxy/") | |||
} | |||
timestamp := time.Now().UTC() | |||
req.SetHeader("event-id", fmt.Sprintf("cli-%s-%s-%s", version.VERSION, version.GITCOMMIT, timestamp.Format(time.RFC3339Nano))) | |||
req.SetHeader("event-id", fmt.Sprintf("kubeless-cli-%s", getRandString(11))) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
why the prefix?
(Am I missing something? Is it important/useful to include the client identifier in the event id rather than putting that information in some other header?)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
the function will receive only these headers (at least in the event
parameter), that's why I am adding that prefix. It is true that there is the event-namespace
header that includes the emitter info so this is "duplicated". Would it be meaningful/useful if we have here just a random string?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Unlike the other event headers, I think the event-id is going to be logged/stored/hashed/indexed by everything that looks at these event traces. If we can (eg) halve the number of bytes required for that, then everything becomes that tiny bit more efficient. (In particular, naive storage sharding can become quite confused by having variation only in the suffix).
It's a micro-optimisation at this point, for sure. But I dream big :)
15148c0
to
c0f4ab4
Compare
de9e4e9
to
358f8cb
Compare
kubeless-rbac.jsonnet
Outdated
@@ -24,7 +24,7 @@ local controller_roles = [ | |||
{ | |||
apiGroups: ["kubeless.io"], | |||
resources: ["functions", "kafkatriggers", "httptriggers", "cronjobtriggers"], | |||
verbs: ["get", "list", "watch", "update"], | |||
verbs: ["create", "get", "list", "watch", "update", "patch"], |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I see we use patch (which is essentially equivalent to update) for managing finalizers. When does the controller serviceaccount need to create these resources?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Which resources? Trigger objects?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
the resources in this rbac rule (functions and triggers)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Got it. You are right. create
should not be there. Will fix it.
pkg/utils/k8sutil.go
Outdated
if len(patch) == 0 || string(patch) == "{}" { | ||
return nil | ||
} | ||
_, err = kubelessClient.KubelessV1beta1().KafkaTriggers(original.Namespace).Patch(original.Name, types.MergePatchType, patch) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Just repeating earlier comment: These patches need to be applied against a particular resource version (with retry on conflict detection) - otherwise we can race against another change between reading the "original" version and applying the patch.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
So both the update/patch has to be done against the latest resource version? Do you recall any core controllers of kube-controller-manger does this retry on conflict detection logic. All I see is they just do update with out retry mostly.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
See all the calls to k8s.io/client-go/util/retry.RetryOnConflict
. I presume the other Update calls just propagate the conflict error up the stack, at worst doing a full requeue and error retry later.
The example of a Patch I've just looked at (pkg/controller/volume/expand/cache.UpdatePVSize
) does a patch that is idempotent, and generated+used within a single simple function, so the call to CreateTwoWayMergePatch()
is just an expensive way to calculate a tiny/known patch object. This use of Patch()
is fine since the resulting patch doesn't actually depend on the original object anymore. If you have another specific example in mind, then I'd be happy to look into it.
I needed to stare at the kubeless code a lot longer to work out if the kubeless patches where also idempotent. In particular, things like having time-separated calls to previousFunction, err := utils.GetFunctionCustomResource(kubelessClient, ...)
and updatedFunction, err := getFunctionDescription(cli, ...)
makes me very nervous .. but it seems cli
isn't actually used(?) in the latter. Staring harder at the getFunctionDescription()
object, some specific constructs like the loop that incrementally appends volumes/volumeMounts for each secret will clearly produce the wrong patch if something else also adds a volume between when previousFunction
was fetched and Patch()
is called.
My point with the comparison to UpdatePVSize
is that concluding that kubeless' use of Patch was not safe took about 20mins of looking for specific issues, since the code between Get+Patch is quite lengthy and complex. This is not a good pattern to follow for robust code (and also actually wrong in this case).
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
TL;DR; I will revert the last commit that does 3-way merge patches and keep the update.
things like having time-separated calls to previousFunction, err := utils.GetFunctionCustomResource(kubelessClient, ...) and updatedFunction, err := getFunctionDescription(cli, ...) makes me very nervous ..
Agree. getFunctionDescription has grown big. And now that we have deployment sepc etc as part of Function spec, it's getting complex. But its CLI only change. If some object changes while cli command execution in flight, cli fails.
My point with the comparison to UpdatePVSize is that concluding that kubeless' use of Patch was not safe took about 20mins of looking for specific issues, since the code between Get+Patch is quite lengthy and complex
Actually from controllers use of patch is pretty straightforward and quick. All controllers are doing is updating Finalizer. Anyway, my intention was to avoid update on top of changed object resulting in the object has been modified; please apply your changes to the latest version and try again
. I guess i will just keep the retry logic on update failure.
All the core controllers simply perform updates, perhaps they wil eventually sync to desired state.
https://github.com/kubernetes/kubernetes/blob/master/pkg/controller/deployment/sync.go#L516
It also seems use of 3-way merge Patch
may be overkill for controller. First, very likely all controller will be dealing is object meta and status. It really does not have same conflicts that updates to Spec
would need that kubectl/api-server has to deal with.
So to summarise, i will revert last commit. Update is what seems more logical for controller, but update alone is not sufficient but would need retry on resource version conflict.
Just to drive home the point that it's not about code length, but that the read-buildpatch-patch pattern is dangerous and hard to get right (and thus almost certainly wrong in long code), all the AddFinalizer/RemoveFinalizer functions were also unsafe to use with Patch. Again, consider a race against another client that is also adding/removing finalizers:
The final object has lost the change that A tried to make to the finalizer array. .. and to be clear: Patch() is fine, if used on a patch that is idempotent (was not derived from a particular snapshot of the object). |
I am not sure how else we can achieve patch without We cannot use strategic merge patch with CRD's. JSON merge patch replace arrays so we endup in the race conditions you mentioned. I am inclined to keep Update with retry at least its very controlled change with no unseen side affect. |
Agreed! For all the reasons you describe. |
8724ad3
to
25d300c
Compare
63226d8
to
200b867
Compare
@andresmgot @anguslees I believe all issues are addressed. Fixed failing GKE test run as well. Please take a look and see if anything that is a MUST fix for this PR to get LGTM. Also note that PR #630 will cover new cli for trigger create/update/delete/update and docs. |
e174ec2
to
fd63962
Compare
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
My +1 for merging this 🎉
let's continue adding further improvements in #630 or other PRs if necessary
Thanks for the review. If there are any suggestions I will follow them up in #630. |
@@ -84,6 +85,12 @@ func createConsumerProcess(broker, topic, funcName, ns, consumerGroupID string, | |||
} | |||
} | |||
|
|||
func isJSON(s string) bool { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Where is the unittest?
This function only works for JSON objects. (That may or may not be ok?)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
let me submit a different PR for covering this file with unit tests
cmd/kubeless/function/function.go
Outdated
@@ -133,7 +134,7 @@ func getContentType(filename string, fbytes []byte) string { | |||
return contentType | |||
} | |||
|
|||
func getFunctionDescription(cli kubernetes.Interface, funcName, ns, handler, file, deps, runtime, runtimeImage, mem, cpu, timeout string, envs, labels []string, secrets []string, defaultFunction kubelessApi.Function) (*kubelessApi.Function, error) { | |||
func getFunctionDescription(cli kubernetes.Interface, funcName, ns, handler, file, deps, runtime, runtimeImage, mem, cpu, timeout string, port int32, headless bool, envs, labels []string, secrets []string, defaultFunction kubelessApi.Function) (*kubelessApi.Function, error) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
What's the usecase for varying port/headless here?
I think the Service we're creating here is an internal trigger->function implementation detail, so altering the port doesn't make much sense - and if making it headless is better/required in some way, then surely we should do that unconditionally for all Functions...
In particular, if we allow changing this here, then I think every trigger needs to be modified to deal with surprise values of ports, and to ensure that load is still balanced across replicas appropriately for headless services. I raise this is a reason that we should not support this variation here.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
What's the usecase for varying port/headless here?
I think the Service we're creating here is an internal trigger->function implementation detail, so altering the port doesn't make much sense - and if making it headless is better/required in some way, then surely we should do that unconditionally for all Functions...
The port/headless change is for people that doesn't want to use a kubeless trigger and want to execute the function with their own methods. It is not so "internal".
In particular, if we allow changing this here, then I think every trigger needs to be modified to deal with surprise values of ports
Agree, triggers should retrieve that information before calling the function (if possible).
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It is not so "internal".
Rather than exposing this internal detail, and thus requiring it to be more flexible than is needed purely for kubeless' own use, and thus pushing complexity onto all the other parts of kubeless - I propose we keep the internal Service as internal and don't support this flexibility here.
If users want a headless Service, or a different port, or some other functionality (type=LoadBalancer?), then I propose we instead ask those users to create a new Service. This new Service can have whatever special bits they want and just use the same function label selector + targetPort=8080. We could even create a ServiceTrigger that did that for them...
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
If no one wants (lets me check in the slack and community call) may be we should just remove it. Its been pain to keep supporting it.
I can't answer that, while the contents of this PR keeps changing in new and surprising ways. |
Issue Ref: [Issue number related to this PR or None]
Basic issues this PR addresses are detailed/or dicussed in this proposal
Description:
This PR is result of collabrative development effort from @andresmgot, @ngtuna and @murali-reddy
What problems does this PR address?
decoupling function and event sources
In the current design of Kubeless, there is just
Function
abstraction, respresented as CRD. This object is used to specify both function details (like function handler, code, runtime etc) and event source details (like Kafkat topic to which this function to be called, cron job etc).This PR brings in notion of
Triggers
as concept in to Kubeless. A Trigger is basically represents the details on the event source and the associated function that needs to be called when event occurs. Existing supported event sources (Kafka, cron job, http) are modelled in to seperate triggers. Corresponding to each trigger there will be seperate CRD controller. This seperation at API layer, provides clean seperation of concerns of functions and event sources.Also this seperation enables Kubeless to support n:m association between the function and event sources. For e.g, Kafka trigger object uses label selector to express the set of functions that need to be associated with trigger.
decoupling event source listener and runtimes
In the current Kubeless architecture, pod that is deployed to run the function, also has the event source listener code (like Kafka client subsribing to a particualr topic. Problem with this approach is we need to maintain a separate image for each language, language version, event source combination.
With this PR each pod that is deployed to run function exposes a http endpoint, irrespective of event source and language type. Also fuction singature has been altered to carry event data and conext information.
design changes
Some notes on the design to help the reviewer.
Function
object updates. Controller will take an action only if required. For e.g, when a function is deleted, Kafka trigger controller checks if there is any Kafka trigger object is associated. If there is Kafka trigger object associated then it will stop sending topic message to the function service.Finalizers
which provides a mechansim through which interested controller can request for soft delete of the API object. This PR leveragesFinalizers
to process the deleted objects. Please see PVC protection controller for a reference.kubeless-controller-manager
, and includes controllers for http triggers and cronjob triggers.afka-trigger-controller
, which has Kafka trigger controller CRD.Known Issue
Kubeless function deploy
implicitly creats the trigger objects as well. You can use API to create trigger object directly. CLI support will be added in seperatle PR.TODOs: