-
Notifications
You must be signed in to change notification settings - Fork 225
Adding transformer field to GCPPubSubSource CRD. #398
Conversation
GitHub webhooks cannot be created otherwise.
/cc @Harwayne @vaikas-google |
…nto transformer # Conflicts: # contrib/gcppubsub/pkg/adapter/adapter.go # contrib/gcppubsub/pkg/reconciler/gcppubsubsource.go
Removed the default transformer for now until we further agree how it should behave. |
event.SetDataContentType(*cloudevents.StringOfApplicationJSON()) | ||
event.SetSource(a.source) | ||
event.SetData(m.Message()) | ||
|
||
// TODO: this will break when the upstream sender updates cloudevents versions. | ||
// The correct thing to do would be to convert the message to a cloudevent if it is one. | ||
et := sourcesv1alpha1.GcpPubSubSourceEventType | ||
if override, ok := m.Message().Attributes[eventTypeOverrideKey]; ok { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
IMO we should remove this asap. If a particular source wants to do something like this (e.g., the gcs source), it should actually do it in the transformer.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I agree.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks!! Couple of questions/suggestions.
// Update the event with the transformed one. | ||
event = *resp | ||
} else { | ||
logger.Warnf("cloud event %q was not transformed", event.ID()) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Should this return an error?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I was leaving it as in case the transformer does not populate properly the response. If so, then we at least send the //google.pubsub.topic.publish event. As we might not control the transformer code, I'm assuming it can be wrong.
Do you prefer to return an error instead?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I like the idea that the transformer can hide events. For example, events that are not on an allowed list or those that do not pass schema validation. I think our 'canonical' filtering of events is to return a 2XX with an empty body.
So I wouldn't return an error here, I would just return nil
.
TopicID: env.Topic, | ||
SinkURI: env.Sink, | ||
SubscriptionID: env.Subscription, | ||
TransformerURI: env.Transformer, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Should we at least validate the URI to make sure it's a valid URI?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
we are not doing that for the sinkURI. I guess if it's not valid it will break when trying to create the httpClient. And it can be empty right? in that case we do not create any client...
The following is the coverage report on pkg/.
|
/lgtm |
[APPROVALNOTIFIER] This PR is APPROVED This pull-request has been approved by: nachocano, vaikas-google The full list of commands accepted by this bot can be found here. The pull request process is described here
Needs approval from an approver in each of these files:
Approvers can indicate their approval by writing |
@@ -64,11 +58,17 @@ func main() { | |||
log.Fatalf("Unable to create logger: %v", err) | |||
} | |||
|
|||
var env envConfig | |||
if err := envconfig.Process("", &env); err != nil { | |||
log.Fatal("Failed to process env var", zap.Error(err)) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Is this a sugared logger? If so, it should be Fatalw
(my preference is just switch to a desugared logger to make sure these bugs don't occur).
googleCloudProject: | ||
type: string | ||
description: "ID of the Google Cloud Project that the PubSub Topic exists in." |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I would add an example here, as historically GCP project name vs ID vs display name has been very confusing.
description: "ID of the Google Cloud Project that the PubSub Topic exists in. E.g. 'my-project-1234' rather than its display name, 'My Project' or its number '1234567890'."
serviceAccountName: | ||
type: string | ||
description: "Name of the ServiceAccount that will be used to run the Receive Adapter Deployment." |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Give an example. Is this the full email address? Just the name before the '@'?
event.SetDataContentType(*cloudevents.StringOfApplicationJSON()) | ||
event.SetSource(a.source) | ||
event.SetData(m.Message()) | ||
|
||
// TODO: this will break when the upstream sender updates cloudevents versions. | ||
// The correct thing to do would be to convert the message to a cloudevent if it is one. | ||
et := sourcesv1alpha1.GcpPubSubSourceEventType | ||
if override, ok := m.Message().Attributes[eventTypeOverrideKey]; ok { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I agree.
// Update the event with the transformed one. | ||
event = *resp | ||
} else { | ||
logger.Warnf("cloud event %q was not transformed", event.ID()) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I like the idea that the transformer can hide events. For example, events that are not on an allowed list or those that do not pass schema validation. I think our 'canonical' filtering of events is to return a 2XX with an empty body.
So I wouldn't return an error here, I would just return nil
.
@@ -83,7 +87,7 @@ const ( | |||
) | |||
|
|||
// GetGcpPubSubSource returns the GcpPubSub CloudEvent source value. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
GetGcpPubSubSource -> GcpPubSubSource
if len(uri) > 0 { | ||
gcpPubSubSourceCondSet.Manage(s).MarkTrue(GcpPubSubConditionTransformerProvided) | ||
} else { | ||
gcpPubSubSourceCondSet.Manage(s).MarkUnknown(GcpPubSubConditionTransformerProvided, "TransformerEmpty", "Transformer has resolved to empty.%s", "") |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why the trailing "%s" that has an empty string arg?
Proposed Changes
Pending
Release Note