Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

NOISSUE - Add IsChannelOwner grpc endpoint #1366

Merged
merged 5 commits into from
Feb 22, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
554 changes: 506 additions & 48 deletions auth.pb.go

Large diffs are not rendered by default.

10 changes: 10 additions & 0 deletions auth.proto
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ import "google/protobuf/empty.proto";

service ThingsService {
rpc CanAccessByKey(AccessByKeyReq) returns (ThingID) {}
rpc IsChannelOwner(ChannelOwnerReq) returns (google.protobuf.Empty) {}
rpc CanAccessByID(AccessByIDReq) returns (google.protobuf.Empty) {}
rpc Identify(Token) returns (ThingID) {}
}
Expand All @@ -26,10 +27,19 @@ message AccessByKeyReq {
string chanID = 2;
}

message ChannelOwnerReq {
string owner = 1;
string chanID = 2;
}

message ThingID {
string value = 1;
}

message ChannelID {
string value = 1;
}

message AccessByIDReq {
string thingID = 1;
string chanID = 2;
Expand Down
4 changes: 4 additions & 0 deletions bootstrap/mocks/things.go
Original file line number Diff line number Diff line change
Expand Up @@ -218,6 +218,10 @@ func (svc *mainfluxThings) CanAccessByID(context.Context, string, string) error
panic("not implemented")
}

func (svc *mainfluxThings) IsChannelOwner(context.Context, string, string) error {
panic("not implemented")
}

func (svc *mainfluxThings) Identify(context.Context, string) (string, error) {
panic("not implemented")
}
Expand Down
4 changes: 4 additions & 0 deletions http/mocks/things.go
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,10 @@ func (tc thingsClient) CanAccessByID(context.Context, *mainflux.AccessByIDReq, .
panic("not implemented")
}

func (tc thingsClient) IsChannelOwner(context.Context, *mainflux.ChannelOwnerReq, ...grpc.CallOption) (*empty.Empty, error) {
panic("not implemented")
}

func (tc thingsClient) Identify(ctx context.Context, req *mainflux.Token, opts ...grpc.CallOption) (*mainflux.ThingID, error) {
panic("not implemented")
}
4 changes: 4 additions & 0 deletions readers/mocks/things.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,10 @@ func (svc thingsServiceMock) CanAccessByID(context.Context, *mainflux.AccessByID
panic("not implemented")
}

func (svc thingsServiceMock) IsChannelOwner(context.Context, *mainflux.ChannelOwnerReq, ...grpc.CallOption) (*empty.Empty, error) {
panic("not implemented")
}

func (svc thingsServiceMock) Identify(context.Context, *mainflux.Token, ...grpc.CallOption) (*mainflux.ThingID, error) {
panic("not implemented")
}
2 changes: 1 addition & 1 deletion scripts/ci.sh
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
# This script contains commands to be executed by the CI tool.
NPROC=$(nproc)
GO_VERSION=1.14.4
PROTOC_VERSION=3.11.4
PROTOC_VERSION=3.12.3
PROTOC_GEN_VERSION=v1.4.2
PROTOC_GOFAST_VERSION=v1.3.1
GRPC_VERSION=v1.29.1
Expand Down
25 changes: 25 additions & 0 deletions things/api/auth/grpc/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ type grpcClient struct {
timeout time.Duration
canAccessByKey endpoint.Endpoint
canAccessByID endpoint.Endpoint
isChannelOwner endpoint.Endpoint
identify endpoint.Endpoint
}

Expand All @@ -47,6 +48,14 @@ func NewClient(conn *grpc.ClientConn, tracer opentracing.Tracer, timeout time.Du
decodeEmptyResponse,
empty.Empty{},
).Endpoint()),
isChannelOwner: kitot.TraceClient(tracer, "is_channel_owner")(kitgrpc.NewClient(
conn,
svcName,
"IsChannelOwner",
encodeIsChannelOwner,
decodeEmptyResponse,
empty.Empty{},
).Endpoint()),
identify: kitot.TraceClient(tracer, "identify")(kitgrpc.NewClient(
conn,
svcName,
Expand Down Expand Up @@ -86,6 +95,17 @@ func (client grpcClient) CanAccessByID(ctx context.Context, req *mainflux.Access
return &empty.Empty{}, er.err
}

func (client grpcClient) IsChannelOwner(ctx context.Context, req *mainflux.ChannelOwnerReq, _ ...grpc.CallOption) (*empty.Empty, error) {
ar := channelOwnerReq{owner: req.GetOwner(), chanID: req.GetChanID()}
res, err := client.isChannelOwner(ctx, ar)
if err != nil {
return nil, err
}

er := res.(emptyRes)
return &empty.Empty{}, er.err
}

func (client grpcClient) Identify(ctx context.Context, req *mainflux.Token, _ ...grpc.CallOption) (*mainflux.ThingID, error) {
ctx, cancel := context.WithTimeout(ctx, client.timeout)
defer cancel()
Expand All @@ -109,6 +129,11 @@ func encodeCanAccessByIDRequest(_ context.Context, grpcReq interface{}) (interfa
return &mainflux.AccessByIDReq{ThingID: req.thingID, ChanID: req.chanID}, nil
}

func encodeIsChannelOwner(_ context.Context, grpcReq interface{}) (interface{}, error) {
req := grpcReq.(channelOwnerReq)
return &mainflux.ChannelOwnerReq{Owner: req.owner, ChanID: req.chanID}, nil
}

func encodeIdentifyRequest(_ context.Context, grpcReq interface{}) (interface{}, error) {
req := grpcReq.(identifyReq)
return &mainflux.Token{Value: req.key}, nil
Expand Down
12 changes: 12 additions & 0 deletions things/api/auth/grpc/endpoint.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,18 @@ func canAccessByIDEndpoint(svc things.Service) endpoint.Endpoint {
}
}

func isChannelOwnerEndpoint(svc things.Service) endpoint.Endpoint {
return func(ctx context.Context, request interface{}) (interface{}, error) {
req := request.(channelOwnerReq)
if err := req.validate(); err != nil {
return nil, err
}

err := svc.IsChannelOwner(ctx, req.chanID, req.owner)
return emptyRes{err: err}, err
}
}

func identifyEndpoint(svc things.Service) endpoint.Endpoint {
return func(ctx context.Context, request interface{}) (interface{}, error) {
req := request.(identifyReq)
Expand Down
13 changes: 13 additions & 0 deletions things/api/auth/grpc/requests.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,19 @@ func (req accessByIDReq) validate() error {
return nil
}

type channelOwnerReq struct {
owner string
chanID string
}

func (req channelOwnerReq) validate() error {
if req.owner == "" || req.chanID == "" {
return things.ErrMalformedEntity
}

return nil
}

type identifyReq struct {
key string
}
Expand Down
20 changes: 20 additions & 0 deletions things/api/auth/grpc/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ var _ mainflux.ThingsServiceServer = (*grpcServer)(nil)
type grpcServer struct {
canAccessByKey kitgrpc.Handler
canAccessByID kitgrpc.Handler
isChannelOwner kitgrpc.Handler
identify kitgrpc.Handler
}

Expand All @@ -37,6 +38,11 @@ func NewServer(tracer opentracing.Tracer, svc things.Service) mainflux.ThingsSer
decodeCanAccessByIDRequest,
encodeEmptyResponse,
),
isChannelOwner: kitgrpc.NewServer(
isChannelOwnerEndpoint(svc),
decodeIsChannelOwnerRequest,
encodeEmptyResponse,
),
identify: kitgrpc.NewServer(
kitot.TraceServer(tracer, "identify")(identifyEndpoint(svc)),
decodeIdentifyRequest,
Expand All @@ -63,6 +69,15 @@ func (gs *grpcServer) CanAccessByID(ctx context.Context, req *mainflux.AccessByI
return res.(*empty.Empty), nil
}

func (gs *grpcServer) IsChannelOwner(ctx context.Context, req *mainflux.ChannelOwnerReq) (*empty.Empty, error) {
_, res, err := gs.isChannelOwner.ServeGRPC(ctx, req)
if err != nil {
return nil, encodeError(err)
}

return res.(*empty.Empty), nil
}

func (gs *grpcServer) Identify(ctx context.Context, req *mainflux.Token) (*mainflux.ThingID, error) {
_, res, err := gs.identify.ServeGRPC(ctx, req)
if err != nil {
Expand All @@ -82,6 +97,11 @@ func decodeCanAccessByIDRequest(_ context.Context, grpcReq interface{}) (interfa
return accessByIDReq{thingID: req.GetThingID(), chanID: req.GetChanID()}, nil
}

func decodeIsChannelOwnerRequest(_ context.Context, grpcReq interface{}) (interface{}, error) {
req := grpcReq.(*mainflux.ChannelOwnerReq)
return channelOwnerReq{owner: req.GetOwner(), chanID: req.GetChanID()}, nil
}

func decodeIdentifyRequest(_ context.Context, grpcReq interface{}) (interface{}, error) {
req := grpcReq.(*mainflux.Token)
return identifyReq{key: req.GetValue()}, nil
Expand Down
14 changes: 14 additions & 0 deletions things/api/logging.go
Original file line number Diff line number Diff line change
Expand Up @@ -253,6 +253,20 @@ func (lm *loggingMiddleware) CanAccessByID(ctx context.Context, chanID, thingID

return lm.svc.CanAccessByID(ctx, chanID, thingID)
}

func (lm *loggingMiddleware) IsChannelOwner(ctx context.Context, owner, chanID string) (err error) {
defer func(begin time.Time) {
message := fmt.Sprintf("Method is_channel_owner for channel %s and user %s took %s to complete", chanID, owner, time.Since(begin))
if err != nil {
lm.logger.Warn(fmt.Sprintf("%s with error: %s.", message, err))
return
}
lm.logger.Info(fmt.Sprintf("%s without errors.", message))
}(time.Now())

return lm.svc.IsChannelOwner(ctx, owner, chanID)
}

func (lm *loggingMiddleware) Identify(ctx context.Context, key string) (id string, err error) {
defer func(begin time.Time) {
message := fmt.Sprintf("Method identify for token %s and thing %s took %s to complete", key, id, time.Since(begin))
Expand Down
9 changes: 9 additions & 0 deletions things/api/metrics.go
Original file line number Diff line number Diff line change
Expand Up @@ -184,6 +184,15 @@ func (ms *metricsMiddleware) CanAccessByID(ctx context.Context, chanID, thingID
return ms.svc.CanAccessByID(ctx, chanID, thingID)
}

func (ms *metricsMiddleware) IsChannelOwner(ctx context.Context, owner, chanID string) error {
defer func(begin time.Time) {
ms.counter.With("method", "is_channel_owner").Add(1)
ms.latency.With("method", "is_channel_owner").Observe(time.Since(begin).Seconds())
}(time.Now())

return ms.svc.CanAccessByID(ctx, owner, chanID)
}

func (ms *metricsMiddleware) Identify(ctx context.Context, key string) (string, error) {
defer func(begin time.Time) {
ms.counter.With("method", "identify").Add(1)
Expand Down
4 changes: 4 additions & 0 deletions things/redis/streams.go
Original file line number Diff line number Diff line change
Expand Up @@ -237,6 +237,10 @@ func (es eventStore) CanAccessByID(ctx context.Context, chanID string, thingID s
return es.svc.CanAccessByID(ctx, chanID, thingID)
}

func (es eventStore) IsChannelOwner(ctx context.Context, owner, chanID string) error {
return es.svc.IsChannelOwner(ctx, owner, chanID)
}

func (es eventStore) Identify(ctx context.Context, key string) (string, error) {
return es.svc.Identify(ctx, key)
}
Expand Down
16 changes: 13 additions & 3 deletions things/service.go
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,7 @@ var (
// Service specifies an API that must be fullfiled by the domain service
// implementation, and all of its decorators (e.g. logging & metrics).
type Service interface {
// CreateThings adds a list of things to the user identified by the provided key.
// CreateThings adds things to the user identified by the provided key.
CreateThings(ctx context.Context, token string, things ...Thing) ([]Thing, error)

// UpdateThing updates the thing identified by the provided ID, that
Expand Down Expand Up @@ -82,7 +82,7 @@ type Service interface {
// belongs to the user identified by the provided key.
RemoveThing(ctx context.Context, token, id string) error

// CreateChannels adds a list of channels to the user identified by the provided key.
// CreateChannels adds channels to the user identified by the provided key.
CreateChannels(ctx context.Context, token string, channels ...Channel) ([]Channel, error)

// UpdateChannel updates the channel identified by the provided ID, that
Expand Down Expand Up @@ -121,6 +121,10 @@ type Service interface {
// the given thing and returns error if it cannot.
CanAccessByID(ctx context.Context, chanID, thingID string) error

// IsChannelOwner determines whether the channel can be accessed by
// the given user and returns error if it cannot.
IsChannelOwner(ctx context.Context, owner, chanID string) error

// Identify returns thing ID for given thing key.
Identify(ctx context.Context, key string) (string, error)

Expand Down Expand Up @@ -377,6 +381,13 @@ func (ts *thingsService) CanAccessByID(ctx context.Context, chanID, thingID stri
return nil
}

func (ts *thingsService) IsChannelOwner(ctx context.Context, owner, chanID string) error {
if _, err := ts.channels.RetrieveByID(ctx, owner, chanID); err != nil {
return err
}
return nil
}

func (ts *thingsService) Identify(ctx context.Context, key string) (string, error) {
id, err := ts.thingCache.ID(ctx, key)
if err == nil {
Expand Down Expand Up @@ -431,7 +442,6 @@ func (ts *thingsService) ListGroups(ctx context.Context, token string, level uin
return groups.GroupPage{}, errors.Wrap(ErrUnauthorizedAccess, err)
}
return ts.groups.RetrieveAll(ctx, level, gm)

}

func (ts *thingsService) ListParents(ctx context.Context, token string, childID string, level uint64, gm groups.Metadata) (groups.GroupPage, error) {
Expand Down
35 changes: 35 additions & 0 deletions things/service_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ const (
wrongValue = "wrong-value"
email = "user@example.com"
token = "token"
token2 = "token2"
n = uint64(10)
)

Expand Down Expand Up @@ -1232,6 +1233,40 @@ func TestCanAccessByID(t *testing.T) {
}
}

func TestIsChannelOwner(t *testing.T) {
svc := newService(map[string]string{token: email, token2: "john.doe@email.net"})

chs, err := svc.CreateChannels(context.Background(), token, channel)
require.Nil(t, err, fmt.Sprintf("unexpected error: %s\n", err))
ownedCh := chs[0]
chs, err = svc.CreateChannels(context.Background(), token2, channel)
require.Nil(t, err, fmt.Sprintf("unexpected error: %s\n", err))
nonOwnedCh := chs[0]

cases := map[string]struct {
channel string
err error
}{
"user owns channel": {
channel: ownedCh.ID,
err: nil,
},
"user does not own channel": {
channel: nonOwnedCh.ID,
err: things.ErrNotFound,
},
"access to non-existing channel": {
channel: wrongID,
err: things.ErrNotFound,
},
}

for desc, tc := range cases {
err := svc.IsChannelOwner(context.Background(), email, tc.channel)
assert.True(t, errors.Contains(err, tc.err), fmt.Sprintf("%s: expected %s got %s\n", desc, tc.err, err))
}
}

func TestIdentify(t *testing.T) {
svc := newService(map[string]string{token: email})

Expand Down