diff --git a/go/stream/message.go b/go/stream/message.go index c733177..2f808ee 100644 --- a/go/stream/message.go +++ b/go/stream/message.go @@ -25,6 +25,7 @@ const ( mapKeySourceType = "sourceType" mapKeyWebhookFailureReason = "webhookFailureReason" mapKeyStage = "stage" + mapKeyCompression = "compression" ) type Message struct { @@ -47,6 +48,7 @@ type MessageProperties struct { SourceType string `json:"sourceType,omitempty"` // optional WebhookFailureReason string `json:"webhookFailureReason,omitempty"` // optional Stage string `json:"stage,omitempty"` // optional + Compression string `json:"compression,omitempty"` // optional } // FromMapProperties converts a property map to MessageProperties. @@ -71,6 +73,7 @@ func FromMapProperties(properties map[string]string) (MessageProperties, error) SourceType: properties[mapKeySourceType], WebhookFailureReason: properties[mapKeyWebhookFailureReason], Stage: properties[mapKeyStage], + Compression: properties[mapKeyCompression], }, nil } @@ -88,6 +91,7 @@ func ToMapProperties(properties MessageProperties) map[string]string { mapKeySourceJobRunID: properties.SourceJobRunID, mapKeySourceTaskRunID: properties.SourceTaskRunID, mapKeyTraceID: properties.TraceID, + mapKeyCompression: properties.Compression, } if properties.Stage == StageWebhook { m[mapKeySourceType] = properties.SourceType diff --git a/go/stream/message_test.go b/go/stream/message_test.go index 8bf9b2e..a8e74e0 100644 --- a/go/stream/message_test.go +++ b/go/stream/message_test.go @@ -24,6 +24,7 @@ func TestMessage(t *testing.T) { "sourceJobRunID": "sourceJobRunID", "sourceTaskRunID": "sourceTaskRunID", "traceID": "traceID", + "compression": "some-serialized-compression-settings", } msg, err := stream.FromMapProperties(input) @@ -41,6 +42,7 @@ func TestMessage(t *testing.T) { SourceJobRunID: "sourceJobRunID", SourceTaskRunID: "sourceTaskRunID", TraceID: "traceID", + Compression: "some-serialized-compression-settings", }, msg) propertiesOut := stream.ToMapProperties(msg) @@ -71,6 +73,7 @@ func TestMessage(t *testing.T) { "sourceType": "sourceType", "webhookFailureReason": "webhookFailureReason", "stage": stream.StageWebhook, + "compression": "some-serialized-compression-settings", } msg, err := stream.FromMapProperties(input) @@ -91,6 +94,7 @@ func TestMessage(t *testing.T) { SourceType: "sourceType", WebhookFailureReason: "webhookFailureReason", Stage: stream.StageWebhook, + Compression: "some-serialized-compression-settings", }, msg) propertiesOut := stream.ToMapProperties(msg) @@ -111,7 +115,8 @@ func TestMessage(t *testing.T) { "requestIP": "10.29.13.20", "sourceJobRunID": "sourceJobRunID", "sourceTaskRunID": "sourceTaskRunID", - "traceID": "traceID" + "traceID": "traceID", + "compression": "some-serialized-compression-settings" }, "payload": { "key": "value", @@ -138,6 +143,7 @@ func TestMessage(t *testing.T) { SourceJobRunID: "sourceJobRunID", SourceTaskRunID: "sourceTaskRunID", TraceID: "traceID", + Compression: "some-serialized-compression-settings", }, Payload: json.RawMessage(`{ "key": "value", @@ -170,7 +176,8 @@ func TestMessage(t *testing.T) { "traceID": "traceID", "sourceType": "sourceType", "webhookFailureReason": "webhookFailureReason", - "stage": "webhook" + "stage": "webhook", + "compression": "some-serialized-compression-settings" }, "payload": { "key": "value", @@ -199,6 +206,7 @@ func TestMessage(t *testing.T) { TraceID: "traceID", SourceType: "sourceType", WebhookFailureReason: "webhookFailureReason", Stage: stream.StageWebhook, + Compression: "some-serialized-compression-settings", }, Payload: json.RawMessage(`{ "key": "value",