Skip to content

Commit

Permalink
Merge branch 'master' into noissue-certs-dev
Browse files Browse the repository at this point in the history
  • Loading branch information
rodneyosodo committed Sep 20, 2023
2 parents 591c0f0 + 28f4965 commit 88e81c4
Show file tree
Hide file tree
Showing 82 changed files with 1,264 additions and 975 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -2,5 +2,5 @@
// SPDX-License-Identifier: Apache-2.0

// Package consumer contains events consumer for events
// published by Things service.
// published by Bootstrap service.
package consumer
File renamed without changes.
Original file line number Diff line number Diff line change
Expand Up @@ -6,94 +6,58 @@ package consumer
import (
"context"
"encoding/json"
"fmt"
"time"

"github.com/go-redis/redis/v8"
"github.com/mainflux/mainflux/bootstrap"
"github.com/mainflux/mainflux/logger"
"github.com/mainflux/mainflux/pkg/clients"
"github.com/mainflux/mainflux/pkg/events"
)

const (
stream = "mainflux.things"
group = "mainflux.bootstrap"

thingRemove = "thing.remove"
thingDisconnect = "policy.delete"

channelPrefix = "channel."
channelUpdate = channelPrefix + "update"
channelRemove = channelPrefix + "remove"

exists = "BUSYGROUP Consumer Group name already exists"
)

// Subscriber represents event source for things and channels provisioning.
type Subscriber interface {
// Subscribes to given subject and receives events.
Subscribe(ctx context.Context, subject string) error
}

type eventStore struct {
svc bootstrap.Service
client *redis.Client
consumer string
logger logger.Logger
type eventHandler struct {
svc bootstrap.Service
}

// NewEventStore returns new event store instance.
func NewEventStore(svc bootstrap.Service, client *redis.Client, consumer string, log logger.Logger) Subscriber {
return eventStore{
svc: svc,
client: client,
consumer: consumer,
logger: log,
// NewEventHandler returns new event store handler.
func NewEventHandler(svc bootstrap.Service) events.EventHandler {
return &eventHandler{
svc: svc,
}
}

func (es eventStore) Subscribe(ctx context.Context, subject string) error {
err := es.client.XGroupCreateMkStream(ctx, stream, group, "$").Err()
if err != nil && err.Error() != exists {
func (es *eventHandler) Handle(ctx context.Context, event events.Event) error {
msg, err := event.Encode()
if err != nil {
return err
}

for {
streams, err := es.client.XReadGroup(ctx, &redis.XReadGroupArgs{
Group: group,
Consumer: es.consumer,
Streams: []string{stream, ">"},
Count: 100,
}).Result()
if err != nil || len(streams) == 0 {
continue
}

for _, msg := range streams[0].Messages {
event := msg.Values

var err error
switch event["operation"] {
case thingRemove:
rte := decodeRemoveThing(event)
err = es.svc.RemoveConfigHandler(ctx, rte.id)
case thingDisconnect:
dte := decodeDisconnectThing(event)
err = es.svc.DisconnectThingHandler(ctx, dte.channelID, dte.thingID)
case channelUpdate:
uce := decodeUpdateChannel(event)
err = es.handleUpdateChannel(ctx, uce)
case channelRemove:
rce := decodeRemoveChannel(event)
err = es.svc.RemoveChannelHandler(ctx, rce.id)
}
if err != nil {
es.logger.Warn(fmt.Sprintf("Failed to handle event sourcing: %s", err.Error()))
break
}
es.client.XAck(ctx, stream, group, msg.ID)
}
switch msg["operation"] {
case thingRemove:
rte := decodeRemoveThing(msg)
err = es.svc.RemoveConfigHandler(ctx, rte.id)
case thingDisconnect:
dte := decodeDisconnectThing(msg)
err = es.svc.DisconnectThingHandler(ctx, dte.channelID, dte.thingID)
case channelUpdate:
uce := decodeUpdateChannel(msg)
err = es.handleUpdateChannel(ctx, uce)
case channelRemove:
rce := decodeRemoveChannel(msg)
err = es.svc.RemoveChannelHandler(ctx, rce.id)
}
if err != nil {
return err
}

return nil
}

func decodeRemoveThing(event map[string]interface{}) removeEvent {
Expand Down Expand Up @@ -155,7 +119,7 @@ func decodeDisconnectThing(event map[string]interface{}) disconnectEvent {
}
}

func (es eventStore) handleUpdateChannel(ctx context.Context, uce updateChannelEvent) error {
func (es *eventHandler) handleUpdateChannel(ctx context.Context, uce updateChannelEvent) error {
channel := bootstrap.Channel{
ID: uce.id,
Name: uce.name,
Expand Down
6 changes: 6 additions & 0 deletions bootstrap/events/doc.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
// Copyright (c) Mainflux
// SPDX-License-Identifier: Apache-2.0

// Package events provides the domain concept definitions needed to support
// bootstrap events functionality.
package events
File renamed without changes.
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ import (
"strings"

"github.com/mainflux/mainflux/bootstrap"
"github.com/mainflux/mainflux/internal/clients/redis"
"github.com/mainflux/mainflux/pkg/events"
)

const (
Expand All @@ -34,14 +34,14 @@ const (
)

var (
_ redis.Event = (*configEvent)(nil)
_ redis.Event = (*removeConfigEvent)(nil)
_ redis.Event = (*bootstrapEvent)(nil)
_ redis.Event = (*changeStateEvent)(nil)
_ redis.Event = (*updateConnectionsEvent)(nil)
_ redis.Event = (*updateCertEvent)(nil)
_ redis.Event = (*listConfigsEvent)(nil)
_ redis.Event = (*removeHandlerEvent)(nil)
_ events.Event = (*configEvent)(nil)
_ events.Event = (*removeConfigEvent)(nil)
_ events.Event = (*bootstrapEvent)(nil)
_ events.Event = (*changeStateEvent)(nil)
_ events.Event = (*updateConnectionsEvent)(nil)
_ events.Event = (*updateCertEvent)(nil)
_ events.Event = (*listConfigsEvent)(nil)
_ events.Event = (*removeHandlerEvent)(nil)
)

type configEvent struct {
Expand Down
54 changes: 54 additions & 0 deletions bootstrap/events/producer/setup_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,54 @@
// Copyright (c) Mainflux
// SPDX-License-Identifier: Apache-2.0

package producer_test

import (
"context"
"fmt"
"log"
"os"
"testing"

"github.com/go-redis/redis/v8"
"github.com/ory/dockertest/v3"
)

var (
redisClient *redis.Client
redisURL string
)

func TestMain(m *testing.M) {
pool, err := dockertest.NewPool("")
if err != nil {
log.Fatalf("Could not connect to docker: %s", err)
}

container, err := pool.Run("redis", "7.2.0-alpine", nil)
if err != nil {
log.Fatalf("Could not start container: %s", err)
}

redisURL = fmt.Sprintf("redis://localhost:%s/0", container.GetPort("6379/tcp"))
opts, err := redis.ParseURL(redisURL)
if err != nil {
log.Fatalf("Could not parse redis URL: %s", err)
}

if err := pool.Retry(func() error {
redisClient = redis.NewClient(opts)

return redisClient.Ping(context.Background()).Err()
}); err != nil {
log.Fatalf("Could not connect to docker: %s", err)
}

code := m.Run()

if err := pool.Purge(container); err != nil {
log.Fatalf("Could not purge container: %s", err)
}

os.Exit(code)
}
Original file line number Diff line number Diff line change
Expand Up @@ -6,36 +6,32 @@ package producer
import (
"context"

"github.com/go-redis/redis/v8"
"github.com/mainflux/mainflux/bootstrap"
mfredis "github.com/mainflux/mainflux/internal/clients/redis"
"github.com/mainflux/mainflux/pkg/events"
"github.com/mainflux/mainflux/pkg/events/redis"
)

const (
streamID = "mainflux.bootstrap"
streamLen = 1000
)
const streamID = "mainflux.bootstrap"

var _ bootstrap.Service = (*eventStore)(nil)

type eventStore struct {
mfredis.Publisher
svc bootstrap.Service
client *redis.Client
events.Publisher
svc bootstrap.Service
}

// NewEventStoreMiddleware returns wrapper around bootstrap service that sends
// events to event store.
func NewEventStoreMiddleware(ctx context.Context, svc bootstrap.Service, client *redis.Client) bootstrap.Service {
es := &eventStore{
svc: svc,
client: client,
Publisher: mfredis.NewEventStore(client, streamID, streamLen),
func NewEventStoreMiddleware(ctx context.Context, svc bootstrap.Service, url string) (bootstrap.Service, error) {
publisher, err := redis.NewPublisher(ctx, url, streamID)
if err != nil {
return nil, err

Check warning on line 28 in bootstrap/events/producer/streams.go

View check run for this annotation

Codecov / codecov/patch

bootstrap/events/producer/streams.go#L28

Added line #L28 was not covered by tests
}

go es.StartPublishingRoutine(ctx)

return es
return &eventStore{
svc: svc,
Publisher: publisher,
}, nil
}

func (es *eventStore) Add(ctx context.Context, token string, cfg bootstrap.Config) (bootstrap.Config, error) {
Expand Down
Loading

0 comments on commit 88e81c4

Please sign in to comment.