Skip to content

Commit

Permalink
add event sequence id
Browse files Browse the repository at this point in the history
Signed-off-by: Wei Liu <liuweixa@redhat.com>
  • Loading branch information
skeeey committed Jan 2, 2024
1 parent 04e951f commit 60e9b10
Show file tree
Hide file tree
Showing 13 changed files with 624 additions and 14 deletions.
14 changes: 14 additions & 0 deletions cloudevents/generic/types/types.go
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,10 @@ const (
// ExtensionResourceVersion is the cloud event extension key of the resource version.
ExtensionResourceVersion = "resourceversion"

// ExtensionSequenceID is the cloud event extension key of the event sequence ID.
// The event sequence id represents the order in which events occur on the agent/source.
ExtensionSequenceNumber = "sequenceid"

// ExtensionDeletionTimestamp is the cloud event extension key of the deletion timestamp.
ExtensionDeletionTimestamp = "deletiontimestamp"

Expand Down Expand Up @@ -159,6 +163,7 @@ type EventBuilder struct {
clusterName string
originalSource string
resourceID string
sequenceID string
resourceVersion *int64
eventType CloudEventsType
deletionTimestamp time.Time
Expand All @@ -181,6 +186,11 @@ func (b *EventBuilder) WithResourceVersion(resourceVersion int64) *EventBuilder
return b
}

func (b *EventBuilder) WithSequenceID(sequenceID string) *EventBuilder {
b.sequenceID = sequenceID
return b
}

func (b *EventBuilder) WithClusterName(clusterName string) *EventBuilder {
b.clusterName = clusterName
return b
Expand Down Expand Up @@ -211,6 +221,10 @@ func (b *EventBuilder) NewEvent() cloudevents.Event {
evt.SetExtension(ExtensionResourceVersion, *b.resourceVersion)
}

if len(b.sequenceID) != 0 {
evt.SetExtension(ExtensionSequenceNumber, b.sequenceID)
}

if len(b.clusterName) != 0 {
evt.SetExtension(ExtensionClusterName, b.clusterName)
}
Expand Down
10 changes: 7 additions & 3 deletions cloudevents/work/agent/codec/manifest.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import (
"fmt"
"strconv"

"github.com/bwmarrin/snowflake"
cloudevents "github.com/cloudevents/sdk-go/v2"
cloudeventstypes "github.com/cloudevents/sdk-go/v2/types"

Expand All @@ -29,12 +30,14 @@ const (

// ManifestCodec is a codec to encode/decode a ManifestWork/cloudevent with ManifestBundle for an agent.
type ManifestCodec struct {
restMapper meta.RESTMapper
sequenceGenerator *snowflake.Node
restMapper meta.RESTMapper
}

func NewManifestCodec(restMapper meta.RESTMapper) *ManifestCodec {
func NewManifestCodec(sequenceGenerator *snowflake.Node, restMapper meta.RESTMapper) *ManifestCodec {
return &ManifestCodec{
restMapper: restMapper,
sequenceGenerator: sequenceGenerator,
restMapper: restMapper,
}
}

Expand Down Expand Up @@ -65,6 +68,7 @@ func (c *ManifestCodec) Encode(source string, eventType types.CloudEventsType, w

evt := types.NewEventBuilder(source, eventType).
WithResourceID(string(work.UID)).
WithSequenceID(c.sequenceGenerator.Generate().String()).
WithResourceVersion(resourceVersion).
WithClusterName(work.Namespace).
WithOriginalSource(originalSource).
Expand Down
23 changes: 19 additions & 4 deletions cloudevents/work/agent/codec/manifest_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import (
"encoding/json"
"testing"

"github.com/bwmarrin/snowflake"
cloudevents "github.com/cloudevents/sdk-go/v2"

corev1 "k8s.io/api/core/v1"
Expand All @@ -17,7 +18,12 @@ import (
)

func TestManifestEventDataType(t *testing.T) {
codec := NewManifestCodec(nil)
sequenceGenerator, err := snowflake.NewNode(1)
if err != nil {
t.Fatal(err)
}

codec := NewManifestCodec(sequenceGenerator, nil)

if codec.EventDataType() != payload.ManifestEventDataType {
t.Errorf("unexpected event data type %s", codec.EventDataType())
Expand Down Expand Up @@ -141,9 +147,14 @@ func TestManifestEncode(t *testing.T) {
},
}

sequenceGenerator, err := snowflake.NewNode(1)
if err != nil {
t.Fatal(err)
}

for _, c := range cases {
t.Run(c.name, func(t *testing.T) {
codec := NewManifestCodec(nil)
codec := NewManifestCodec(sequenceGenerator, nil)

_, err := codec.Encode("cluster1-work-agent", c.eventType, c.work)
if c.expectedErr {
Expand Down Expand Up @@ -281,10 +292,14 @@ func TestManifestDecode(t *testing.T) {
},
}

sequenceGenerator, err := snowflake.NewNode(1)
if err != nil {
t.Fatal(err)
}

for _, c := range cases {
t.Run(c.name, func(t *testing.T) {
codec := NewManifestCodec(nil)

codec := NewManifestCodec(sequenceGenerator, nil)
_, err := codec.Decode(c.event)
if c.expectedErr {
if err == nil {
Expand Down
12 changes: 9 additions & 3 deletions cloudevents/work/agent/codec/manifestbundle.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import (
"fmt"
"strconv"

"github.com/bwmarrin/snowflake"
cloudevents "github.com/cloudevents/sdk-go/v2"
cloudeventstypes "github.com/cloudevents/sdk-go/v2/types"

Expand All @@ -17,10 +18,14 @@ import (
)

// ManifestBundleCodec is a codec to encode/decode a ManifestWork/cloudevent with ManifestBundle for an agent.
type ManifestBundleCodec struct{}
type ManifestBundleCodec struct {
sequenceGenerator *snowflake.Node
}

func NewManifestBundleCodec() *ManifestBundleCodec {
return &ManifestBundleCodec{}
func NewManifestBundleCodec(sequenceGenerator *snowflake.Node) *ManifestBundleCodec {
return &ManifestBundleCodec{
sequenceGenerator: sequenceGenerator,
}
}

// EventDataType always returns the event data type `io.open-cluster-management.works.v1alpha1.manifestbundles`.
Expand All @@ -46,6 +51,7 @@ func (c *ManifestBundleCodec) Encode(source string, eventType types.CloudEventsT

evt := types.NewEventBuilder(source, eventType).
WithResourceID(string(work.UID)).
WithSequenceID(c.sequenceGenerator.Generate().String()).
WithResourceVersion(resourceVersion).
WithClusterName(work.Namespace).
WithOriginalSource(originalSource).
Expand Down
22 changes: 19 additions & 3 deletions cloudevents/work/agent/codec/manifestbundle_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package codec
import (
"testing"

"github.com/bwmarrin/snowflake"
cloudevents "github.com/cloudevents/sdk-go/v2"

metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
Expand All @@ -14,7 +15,12 @@ import (
)

func TestManifestBundleEventDataType(t *testing.T) {
codec := NewManifestBundleCodec()
sequenceGenerator, err := snowflake.NewNode(1)
if err != nil {
t.Fatal(err)
}

codec := NewManifestBundleCodec(sequenceGenerator)

if codec.EventDataType() != payload.ManifestBundleEventDataType {
t.Errorf("unexpected event data type %s", codec.EventDataType())
Expand Down Expand Up @@ -90,9 +96,14 @@ func TestManifestBundleEncode(t *testing.T) {
},
}

sequenceGenerator, err := snowflake.NewNode(1)
if err != nil {
t.Fatal(err)
}

for _, c := range cases {
t.Run(c.name, func(t *testing.T) {
codec := NewManifestBundleCodec()
codec := NewManifestBundleCodec(sequenceGenerator)

_, err := codec.Encode("cluster1-work-agent", c.eventType, c.work)
if c.expectedErr {
Expand Down Expand Up @@ -229,9 +240,14 @@ func TestManifestBundleDecode(t *testing.T) {
},
}

sequenceGenerator, err := snowflake.NewNode(1)
if err != nil {
t.Fatal(err)
}

for _, c := range cases {
t.Run(c.name, func(t *testing.T) {
codec := NewManifestBundleCodec()
codec := NewManifestBundleCodec(sequenceGenerator)

_, err := codec.Decode(c.event)
if c.expectedErr {
Expand Down
1 change: 1 addition & 0 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ module open-cluster-management.io/api
go 1.20

require (
github.com/bwmarrin/snowflake v0.3.0
github.com/cloudevents/sdk-go/protocol/mqtt_paho/v2 v2.0.0-20231030012137-0836a524e995
github.com/cloudevents/sdk-go/v2 v2.14.0
github.com/eclipse/paho.golang v0.11.0
Expand Down
2 changes: 2 additions & 0 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,8 @@ github.com/beorn7/perks v1.0.1 h1:VlbKKnNfV8bJzeqoa4cOKqO6bYr3WgKZxO8Z16+hsOM=
github.com/beorn7/perks v1.0.1/go.mod h1:G2ZrVWU2WbWT9wwq4/hrbKbnv/1ERSJQ0ibhJ6rlkpw=
github.com/blang/semver/v4 v4.0.0 h1:1PFHFE6yCCTv8C1TeyNNarDzntLi7wMI5i/pzqYIsAM=
github.com/blang/semver/v4 v4.0.0/go.mod h1:IbckMUScFkM3pff0VJDNKRiT6TG/YpiHIM2yvyW5YoQ=
github.com/bwmarrin/snowflake v0.3.0 h1:xm67bEhkKh6ij1790JB83OujPR5CzNe8QuQqAgISZN0=
github.com/bwmarrin/snowflake v0.3.0/go.mod h1:NdZxfVWX+oR6y2K0o6qAYv6gIOP9rjG0/E9WsDpxqwE=
github.com/cespare/xxhash/v2 v2.2.0 h1:DC2CZ1Ep5Y4k3ZQ899DldepgrayRUGE6BBZ/cd9Cj44=
github.com/cespare/xxhash/v2 v2.2.0/go.mod h1:VGX0DQ3Q6kWi7AoAeZDth3/j3BFtOZR5XLFGgcrjCOs=
github.com/cloudevents/sdk-go/protocol/mqtt_paho/v2 v2.0.0-20231030012137-0836a524e995 h1:pXyRKZ0T5WoB6X9QnHS5cEyW0Got39bNQIECxGUKVO4=
Expand Down
8 changes: 7 additions & 1 deletion test/integration/cloudevents/agent/agent.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,15 +3,21 @@ package agent
import (
"context"

"github.com/bwmarrin/snowflake"
"open-cluster-management.io/api/cloudevents/generic/options/mqtt"
"open-cluster-management.io/api/cloudevents/work"
"open-cluster-management.io/api/cloudevents/work/agent/codec"
)

func StartWorkAgent(ctx context.Context, clusterName string, config *mqtt.MQTTOptions) (*work.ClientHolder, error) {
sequenceGenerator, err := snowflake.NewNode(1)
if err != nil {
return nil, err
}

clientHolder, err := work.NewClientHolderBuilder(clusterName, config).
WithClusterName(clusterName).
WithCodecs(codec.NewManifestCodec(nil)).
WithCodecs(codec.NewManifestCodec(sequenceGenerator, nil)).
NewClientHolder(ctx)
if err != nil {
return nil, err
Expand Down
12 changes: 12 additions & 0 deletions vendor/github.com/bwmarrin/snowflake/.travis.yml

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

23 changes: 23 additions & 0 deletions vendor/github.com/bwmarrin/snowflake/LICENSE

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

Loading

0 comments on commit 60e9b10

Please sign in to comment.