Skip to content
This repository has been archived by the owner on Jun 4, 2021. It is now read-only.

Fixing issue with incorrect conversion of sqs SentTimestamp #996

Merged
merged 3 commits into from
Mar 8, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
9 changes: 5 additions & 4 deletions awssqs/pkg/adapter/adapter.go
Original file line number Diff line number Diff line change
Expand Up @@ -169,18 +169,19 @@ func (a *Adapter) receiveMessage(ctx context.Context, m *sqs.Message, ack func()
}

func (a *Adapter) makeEvent(m *sqs.Message) (*cloudevents.Event, error) {

// TODO verify the timestamp conversion
timestamp, err := strconv.ParseInt(*m.Attributes["SentTimestamp"], 10, 64)
if err != nil {
if err == nil {
//Convert to nanoseconds as sqs SentTimestamp is millisecond
timestamp = timestamp * int64(1000000)
} else {
timestamp = time.Now().UnixNano()
}

event := cloudevents.NewEvent(cloudevents.VersionV1)
event.SetID(*m.MessageId)
event.SetType(sourcesv1alpha1.AwsSqsSourceEventType)
event.SetSource(types.ParseURIRef(a.QueueURL).String())
event.SetTime(time.Unix(timestamp, 0))
event.SetTime(time.Unix(0, timestamp))

if err := event.SetData(m); err != nil {
return nil, err
Expand Down
16 changes: 10 additions & 6 deletions awssqs/pkg/adapter/adapter_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,19 +29,22 @@ import (
)

func TestPostMessage_ServeHTTP(t *testing.T) {
timestamp := "1542107977907705474"
//Millisecond unix timestamp
timestamp := "1542107977907"
testCases := map[string]struct {
sink func(http.ResponseWriter, *http.Request)
reqBody string
error bool
}{
"happy": {
sink: sinkAccepted,
reqBody: `{"Attributes":{"SentTimestamp":"1542107977907705474"},"Body":"The body","MD5OfBody":null,"MD5OfMessageAttributes":null,"MessageAttributes":null,"MessageId":"ABC01","ReceiptHandle":null}`,
sink: sinkAccepted,
//Millisecond unix timestamp
reqBody: `{"Attributes":{"SentTimestamp":"1542107977907"},"Body":"The body","MD5OfBody":null,"MD5OfMessageAttributes":null,"MessageAttributes":null,"MessageId":"ABC01","ReceiptHandle":null}`,
},
"rejected": {
sink: sinkRejected,
reqBody: `{"Attributes":{"SentTimestamp":"1542107977907705474"},"Body":"The body","MD5OfBody":null,"MD5OfMessageAttributes":null,"MessageAttributes":null,"MessageId":"ABC01","ReceiptHandle":null}`,
sink: sinkRejected,
//Millisecond unix timestamp
reqBody: `{"Attributes":{"SentTimestamp":"1542107977907"},"Body":"The body","MD5OfBody":null,"MD5OfMessageAttributes":null,"MessageAttributes":null,"MessageId":"ABC01","ReceiptHandle":null}`,
error: true,
},
}
Expand Down Expand Up @@ -105,7 +108,8 @@ func TestReceiveMessage_ServeHTTP(t *testing.T) {

id := "ABC01"
body := "the body"
timestamp := "1542107977907705474"
//Millisecond unix timestamp
timestamp := "1542107977907"
m := &sqs.Message{
MessageId: &id,
Body: &body,
Expand Down