Skip to content

Commit

Permalink
NOISSUE - Add IsChannelOwner grpc endpoint (absmach#1366)
Browse files Browse the repository at this point in the history
* Add CanAccessChannelByOwner grpc endpoint

Signed-off-by: Darko Draskovic <darko.draskovic@gmail.com>

* Rename grpc endpoint to IsChannelOwner

Signed-off-by: Darko Draskovic <darko.draskovic@gmail.com>

* Add tests for IsChannelOwner grpc

Signed-off-by: Darko Draskovic <darko.draskovic@gmail.com>

* Bump protoc version

Signed-off-by: Darko Draskovic <darko.draskovic@gmail.com>

* Make proto

Signed-off-by: Darko Draskovic <darko.draskovic@gmail.com>
  • Loading branch information
darkodraskovic authored and fbugarski committed Mar 8, 2021
1 parent 1b80a89 commit d10fd44
Show file tree
Hide file tree
Showing 15 changed files with 674 additions and 52 deletions.
554 changes: 506 additions & 48 deletions auth.pb.go

Large diffs are not rendered by default.

10 changes: 10 additions & 0 deletions auth.proto
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ import "google/protobuf/empty.proto";

service ThingsService {
rpc CanAccessByKey(AccessByKeyReq) returns (ThingID) {}
rpc IsChannelOwner(ChannelOwnerReq) returns (google.protobuf.Empty) {}
rpc CanAccessByID(AccessByIDReq) returns (google.protobuf.Empty) {}
rpc Identify(Token) returns (ThingID) {}
}
Expand All @@ -26,10 +27,19 @@ message AccessByKeyReq {
string chanID = 2;
}

message ChannelOwnerReq {
string owner = 1;
string chanID = 2;
}

message ThingID {
string value = 1;
}

message ChannelID {
string value = 1;
}

message AccessByIDReq {
string thingID = 1;
string chanID = 2;
Expand Down
4 changes: 4 additions & 0 deletions bootstrap/mocks/things.go
Original file line number Diff line number Diff line change
Expand Up @@ -218,6 +218,10 @@ func (svc *mainfluxThings) CanAccessByID(context.Context, string, string) error
panic("not implemented")
}

func (svc *mainfluxThings) IsChannelOwner(context.Context, string, string) error {
panic("not implemented")
}

func (svc *mainfluxThings) Identify(context.Context, string) (string, error) {
panic("not implemented")
}
Expand Down
4 changes: 4 additions & 0 deletions http/mocks/things.go
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,10 @@ func (tc thingsClient) CanAccessByID(context.Context, *mainflux.AccessByIDReq, .
panic("not implemented")
}

func (tc thingsClient) IsChannelOwner(context.Context, *mainflux.ChannelOwnerReq, ...grpc.CallOption) (*empty.Empty, error) {
panic("not implemented")
}

func (tc thingsClient) Identify(ctx context.Context, req *mainflux.Token, opts ...grpc.CallOption) (*mainflux.ThingID, error) {
panic("not implemented")
}
4 changes: 4 additions & 0 deletions readers/mocks/things.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,10 @@ func (svc thingsServiceMock) CanAccessByID(context.Context, *mainflux.AccessByID
panic("not implemented")
}

func (svc thingsServiceMock) IsChannelOwner(context.Context, *mainflux.ChannelOwnerReq, ...grpc.CallOption) (*empty.Empty, error) {
panic("not implemented")
}

func (svc thingsServiceMock) Identify(context.Context, *mainflux.Token, ...grpc.CallOption) (*mainflux.ThingID, error) {
panic("not implemented")
}
2 changes: 1 addition & 1 deletion scripts/ci.sh
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
# This script contains commands to be executed by the CI tool.
NPROC=$(nproc)
GO_VERSION=1.14.4
PROTOC_VERSION=3.11.4
PROTOC_VERSION=3.12.3
PROTOC_GEN_VERSION=v1.4.2
PROTOC_GOFAST_VERSION=v1.3.1
GRPC_VERSION=v1.29.1
Expand Down
25 changes: 25 additions & 0 deletions things/api/auth/grpc/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ type grpcClient struct {
timeout time.Duration
canAccessByKey endpoint.Endpoint
canAccessByID endpoint.Endpoint
isChannelOwner endpoint.Endpoint
identify endpoint.Endpoint
}

Expand All @@ -47,6 +48,14 @@ func NewClient(conn *grpc.ClientConn, tracer opentracing.Tracer, timeout time.Du
decodeEmptyResponse,
empty.Empty{},
).Endpoint()),
isChannelOwner: kitot.TraceClient(tracer, "is_channel_owner")(kitgrpc.NewClient(
conn,
svcName,
"IsChannelOwner",
encodeIsChannelOwner,
decodeEmptyResponse,
empty.Empty{},
).Endpoint()),
identify: kitot.TraceClient(tracer, "identify")(kitgrpc.NewClient(
conn,
svcName,
Expand Down Expand Up @@ -86,6 +95,17 @@ func (client grpcClient) CanAccessByID(ctx context.Context, req *mainflux.Access
return &empty.Empty{}, er.err
}

func (client grpcClient) IsChannelOwner(ctx context.Context, req *mainflux.ChannelOwnerReq, _ ...grpc.CallOption) (*empty.Empty, error) {
ar := channelOwnerReq{owner: req.GetOwner(), chanID: req.GetChanID()}
res, err := client.isChannelOwner(ctx, ar)
if err != nil {
return nil, err
}

er := res.(emptyRes)
return &empty.Empty{}, er.err
}

func (client grpcClient) Identify(ctx context.Context, req *mainflux.Token, _ ...grpc.CallOption) (*mainflux.ThingID, error) {
ctx, cancel := context.WithTimeout(ctx, client.timeout)
defer cancel()
Expand All @@ -109,6 +129,11 @@ func encodeCanAccessByIDRequest(_ context.Context, grpcReq interface{}) (interfa
return &mainflux.AccessByIDReq{ThingID: req.thingID, ChanID: req.chanID}, nil
}

func encodeIsChannelOwner(_ context.Context, grpcReq interface{}) (interface{}, error) {
req := grpcReq.(channelOwnerReq)
return &mainflux.ChannelOwnerReq{Owner: req.owner, ChanID: req.chanID}, nil
}

func encodeIdentifyRequest(_ context.Context, grpcReq interface{}) (interface{}, error) {
req := grpcReq.(identifyReq)
return &mainflux.Token{Value: req.key}, nil
Expand Down
12 changes: 12 additions & 0 deletions things/api/auth/grpc/endpoint.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,18 @@ func canAccessByIDEndpoint(svc things.Service) endpoint.Endpoint {
}
}

func isChannelOwnerEndpoint(svc things.Service) endpoint.Endpoint {
return func(ctx context.Context, request interface{}) (interface{}, error) {
req := request.(channelOwnerReq)
if err := req.validate(); err != nil {
return nil, err
}

err := svc.IsChannelOwner(ctx, req.chanID, req.owner)
return emptyRes{err: err}, err
}
}

func identifyEndpoint(svc things.Service) endpoint.Endpoint {
return func(ctx context.Context, request interface{}) (interface{}, error) {
req := request.(identifyReq)
Expand Down
13 changes: 13 additions & 0 deletions things/api/auth/grpc/requests.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,19 @@ func (req accessByIDReq) validate() error {
return nil
}

type channelOwnerReq struct {
owner string
chanID string
}

func (req channelOwnerReq) validate() error {
if req.owner == "" || req.chanID == "" {
return things.ErrMalformedEntity
}

return nil
}

type identifyReq struct {
key string
}
Expand Down
20 changes: 20 additions & 0 deletions things/api/auth/grpc/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ var _ mainflux.ThingsServiceServer = (*grpcServer)(nil)
type grpcServer struct {
canAccessByKey kitgrpc.Handler
canAccessByID kitgrpc.Handler
isChannelOwner kitgrpc.Handler
identify kitgrpc.Handler
}

Expand All @@ -37,6 +38,11 @@ func NewServer(tracer opentracing.Tracer, svc things.Service) mainflux.ThingsSer
decodeCanAccessByIDRequest,
encodeEmptyResponse,
),
isChannelOwner: kitgrpc.NewServer(
isChannelOwnerEndpoint(svc),
decodeIsChannelOwnerRequest,
encodeEmptyResponse,
),
identify: kitgrpc.NewServer(
kitot.TraceServer(tracer, "identify")(identifyEndpoint(svc)),
decodeIdentifyRequest,
Expand All @@ -63,6 +69,15 @@ func (gs *grpcServer) CanAccessByID(ctx context.Context, req *mainflux.AccessByI
return res.(*empty.Empty), nil
}

func (gs *grpcServer) IsChannelOwner(ctx context.Context, req *mainflux.ChannelOwnerReq) (*empty.Empty, error) {
_, res, err := gs.isChannelOwner.ServeGRPC(ctx, req)
if err != nil {
return nil, encodeError(err)
}

return res.(*empty.Empty), nil
}

func (gs *grpcServer) Identify(ctx context.Context, req *mainflux.Token) (*mainflux.ThingID, error) {
_, res, err := gs.identify.ServeGRPC(ctx, req)
if err != nil {
Expand All @@ -82,6 +97,11 @@ func decodeCanAccessByIDRequest(_ context.Context, grpcReq interface{}) (interfa
return accessByIDReq{thingID: req.GetThingID(), chanID: req.GetChanID()}, nil
}

func decodeIsChannelOwnerRequest(_ context.Context, grpcReq interface{}) (interface{}, error) {
req := grpcReq.(*mainflux.ChannelOwnerReq)
return channelOwnerReq{owner: req.GetOwner(), chanID: req.GetChanID()}, nil
}

func decodeIdentifyRequest(_ context.Context, grpcReq interface{}) (interface{}, error) {
req := grpcReq.(*mainflux.Token)
return identifyReq{key: req.GetValue()}, nil
Expand Down
14 changes: 14 additions & 0 deletions things/api/logging.go
Original file line number Diff line number Diff line change
Expand Up @@ -253,6 +253,20 @@ func (lm *loggingMiddleware) CanAccessByID(ctx context.Context, chanID, thingID

return lm.svc.CanAccessByID(ctx, chanID, thingID)
}

func (lm *loggingMiddleware) IsChannelOwner(ctx context.Context, owner, chanID string) (err error) {
defer func(begin time.Time) {
message := fmt.Sprintf("Method is_channel_owner for channel %s and user %s took %s to complete", chanID, owner, time.Since(begin))
if err != nil {
lm.logger.Warn(fmt.Sprintf("%s with error: %s.", message, err))
return
}
lm.logger.Info(fmt.Sprintf("%s without errors.", message))
}(time.Now())

return lm.svc.IsChannelOwner(ctx, owner, chanID)
}

func (lm *loggingMiddleware) Identify(ctx context.Context, key string) (id string, err error) {
defer func(begin time.Time) {
message := fmt.Sprintf("Method identify for token %s and thing %s took %s to complete", key, id, time.Since(begin))
Expand Down
9 changes: 9 additions & 0 deletions things/api/metrics.go
Original file line number Diff line number Diff line change
Expand Up @@ -184,6 +184,15 @@ func (ms *metricsMiddleware) CanAccessByID(ctx context.Context, chanID, thingID
return ms.svc.CanAccessByID(ctx, chanID, thingID)
}

func (ms *metricsMiddleware) IsChannelOwner(ctx context.Context, owner, chanID string) error {
defer func(begin time.Time) {
ms.counter.With("method", "is_channel_owner").Add(1)
ms.latency.With("method", "is_channel_owner").Observe(time.Since(begin).Seconds())
}(time.Now())

return ms.svc.CanAccessByID(ctx, owner, chanID)
}

func (ms *metricsMiddleware) Identify(ctx context.Context, key string) (string, error) {
defer func(begin time.Time) {
ms.counter.With("method", "identify").Add(1)
Expand Down
4 changes: 4 additions & 0 deletions things/redis/streams.go
Original file line number Diff line number Diff line change
Expand Up @@ -237,6 +237,10 @@ func (es eventStore) CanAccessByID(ctx context.Context, chanID string, thingID s
return es.svc.CanAccessByID(ctx, chanID, thingID)
}

func (es eventStore) IsChannelOwner(ctx context.Context, owner, chanID string) error {
return es.svc.IsChannelOwner(ctx, owner, chanID)
}

func (es eventStore) Identify(ctx context.Context, key string) (string, error) {
return es.svc.Identify(ctx, key)
}
Expand Down
16 changes: 13 additions & 3 deletions things/service.go
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,7 @@ var (
// Service specifies an API that must be fullfiled by the domain service
// implementation, and all of its decorators (e.g. logging & metrics).
type Service interface {
// CreateThings adds a list of things to the user identified by the provided key.
// CreateThings adds things to the user identified by the provided key.
CreateThings(ctx context.Context, token string, things ...Thing) ([]Thing, error)

// UpdateThing updates the thing identified by the provided ID, that
Expand Down Expand Up @@ -82,7 +82,7 @@ type Service interface {
// belongs to the user identified by the provided key.
RemoveThing(ctx context.Context, token, id string) error

// CreateChannels adds a list of channels to the user identified by the provided key.
// CreateChannels adds channels to the user identified by the provided key.
CreateChannels(ctx context.Context, token string, channels ...Channel) ([]Channel, error)

// UpdateChannel updates the channel identified by the provided ID, that
Expand Down Expand Up @@ -121,6 +121,10 @@ type Service interface {
// the given thing and returns error if it cannot.
CanAccessByID(ctx context.Context, chanID, thingID string) error

// IsChannelOwner determines whether the channel can be accessed by
// the given user and returns error if it cannot.
IsChannelOwner(ctx context.Context, owner, chanID string) error

// Identify returns thing ID for given thing key.
Identify(ctx context.Context, key string) (string, error)

Expand Down Expand Up @@ -377,6 +381,13 @@ func (ts *thingsService) CanAccessByID(ctx context.Context, chanID, thingID stri
return nil
}

func (ts *thingsService) IsChannelOwner(ctx context.Context, owner, chanID string) error {
if _, err := ts.channels.RetrieveByID(ctx, owner, chanID); err != nil {
return err
}
return nil
}

func (ts *thingsService) Identify(ctx context.Context, key string) (string, error) {
id, err := ts.thingCache.ID(ctx, key)
if err == nil {
Expand Down Expand Up @@ -431,7 +442,6 @@ func (ts *thingsService) ListGroups(ctx context.Context, token string, level uin
return groups.GroupPage{}, errors.Wrap(ErrUnauthorizedAccess, err)
}
return ts.groups.RetrieveAll(ctx, level, gm)

}

func (ts *thingsService) ListParents(ctx context.Context, token string, childID string, level uint64, gm groups.Metadata) (groups.GroupPage, error) {
Expand Down
35 changes: 35 additions & 0 deletions things/service_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ const (
wrongValue = "wrong-value"
email = "user@example.com"
token = "token"
token2 = "token2"
n = uint64(10)
)

Expand Down Expand Up @@ -1232,6 +1233,40 @@ func TestCanAccessByID(t *testing.T) {
}
}

func TestIsChannelOwner(t *testing.T) {
svc := newService(map[string]string{token: email, token2: "john.doe@email.net"})

chs, err := svc.CreateChannels(context.Background(), token, channel)
require.Nil(t, err, fmt.Sprintf("unexpected error: %s\n", err))
ownedCh := chs[0]
chs, err = svc.CreateChannels(context.Background(), token2, channel)
require.Nil(t, err, fmt.Sprintf("unexpected error: %s\n", err))
nonOwnedCh := chs[0]

cases := map[string]struct {
channel string
err error
}{
"user owns channel": {
channel: ownedCh.ID,
err: nil,
},
"user does not own channel": {
channel: nonOwnedCh.ID,
err: things.ErrNotFound,
},
"access to non-existing channel": {
channel: wrongID,
err: things.ErrNotFound,
},
}

for desc, tc := range cases {
err := svc.IsChannelOwner(context.Background(), email, tc.channel)
assert.True(t, errors.Contains(err, tc.err), fmt.Sprintf("%s: expected %s got %s\n", desc, tc.err, err))
}
}

func TestIdentify(t *testing.T) {
svc := newService(map[string]string{token: email})

Expand Down

0 comments on commit d10fd44

Please sign in to comment.