Skip to content

Commit

Permalink
DATA-1786 Support filtering capture data with modular camera component (
Browse files Browse the repository at this point in the history
  • Loading branch information
kaywux authored Aug 17, 2023
1 parent 772e5eb commit 93a54c7
Show file tree
Hide file tree
Showing 4 changed files with 60 additions and 0 deletions.
15 changes: 15 additions & 0 deletions components/camera/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ import (
goutils "go.viam.com/utils"
"go.viam.com/utils/rpc"

"go.viam.com/rdk/data"
"go.viam.com/rdk/pointcloud"
"go.viam.com/rdk/protoutils"
"go.viam.com/rdk/resource"
Expand Down Expand Up @@ -64,9 +65,16 @@ func (c *client) Read(ctx context.Context) (image.Image, func(), error) {
defer span.End()
mimeType := gostream.MIMETypeHint(ctx, "")
expectedType, _ := utils.CheckLazyMIMEType(mimeType)

extra, err := data.GetExtraFromContext(ctx)
if err != nil {
return nil, nil, err
}

resp, err := c.client.GetImage(ctx, &pb.GetImageRequest{
Name: c.name,
MimeType: expectedType,
Extra: extra,
})
if err != nil {
return nil, nil, err
Expand Down Expand Up @@ -177,9 +185,16 @@ func (c *client) NextPointCloud(ctx context.Context) (pointcloud.PointCloud, err
defer span.End()

ctx, getPcdSpan := trace.StartSpan(ctx, "camera::client::NextPointCloud::GetPointCloud")

extra, err := data.GetExtraFromContext(ctx)
if err != nil {
return nil, err
}

resp, err := c.client.GetPointCloud(ctx, &pb.GetPointCloudRequest{
Name: c.name,
MimeType: utils.MimeTypePCD,
Extra: extra,
})
getPcdSpan.End()
if err != nil {
Expand Down
13 changes: 13 additions & 0 deletions components/camera/collectors.go
Original file line number Diff line number Diff line change
Expand Up @@ -44,8 +44,14 @@ func newNextPointCloudCollector(resource interface{}, params data.CollectorParam
_, span := trace.StartSpan(ctx, "camera::data::collector::CaptureFunc::NextPointCloud")
defer span.End()

ctx = context.WithValue(ctx, data.FromDMContextKey{}, true)

v, err := camera.NextPointCloud(ctx)
if err != nil {
// If err is from a modular filter component, propagate it to getAndPushNextReading().
if errors.Is(err, data.ErrNoCaptureToStore) {
return nil, data.ErrNoCaptureToStore
}
return nil, data.FailedToReadErr(params.ComponentName, nextPointCloud.String(), err)
}

Expand Down Expand Up @@ -81,8 +87,15 @@ func newReadImageCollector(resource interface{}, params data.CollectorParams) (d
_, span := trace.StartSpan(ctx, "camera::data::collector::CaptureFunc::ReadImage")
defer span.End()

ctx = context.WithValue(ctx, data.FromDMContextKey{}, true)

img, release, err := ReadImage(ctx, camera)
if err != nil {
// If err is from a modular filter component, propagate it to getAndPushNextReading().
if errors.Is(err, data.ErrNoCaptureToStore) {
return nil, data.ErrNoCaptureToStore
}

return nil, data.FailedToReadErr(params.ComponentName, readImage.String(), err)
}
defer func() {
Expand Down
7 changes: 7 additions & 0 deletions components/camera/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ import (
pb "go.viam.com/api/component/camera/v1"
"google.golang.org/genproto/googleapis/api/httpbody"

"go.viam.com/rdk/data"
"go.viam.com/rdk/pointcloud"
"go.viam.com/rdk/protoutils"
"go.viam.com/rdk/resource"
Expand Down Expand Up @@ -71,6 +72,12 @@ func (s *serviceServer) GetImage(
}

req.MimeType = utils.WithLazyMIMEType(req.MimeType)

// Add 'fromDataManagement' to context to avoid threading extra through gostream API.
if req.Extra.AsMap()[data.FromDMString] == true {
ctx = context.WithValue(ctx, data.FromDMContextKey{}, true)
}

img, release, err := ReadImage(gostream.WithMIMETypeHint(ctx, req.MimeType), cam)
if err != nil {
return nil, err
Expand Down
25 changes: 25 additions & 0 deletions data/collector.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,10 @@ import (
v1 "go.viam.com/api/app/datasync/v1"
"go.viam.com/utils"
"go.viam.com/utils/protoutils"
"google.golang.org/grpc/codes"
"google.golang.org/grpc/status"
"google.golang.org/protobuf/types/known/anypb"
"google.golang.org/protobuf/types/known/structpb"
"google.golang.org/protobuf/types/known/timestamppb"

"go.viam.com/rdk/resource"
Expand All @@ -28,6 +31,15 @@ var sleepCaptureCutoff = 2 * time.Millisecond
// CaptureFunc allows the creation of simple Capturers with anonymous functions.
type CaptureFunc func(ctx context.Context, params map[string]*anypb.Any) (interface{}, error)

// FromDMContextKey is used to check whether the context is from data management.
type FromDMContextKey struct{}

// FromDMString is used to access the 'fromDataManagement' value from a request's Extra struct.
const FromDMString = "fromDataManagement"

// ErrNoCaptureToStore is returned when a modular filter resource filters the capture coming from the base resource.
var ErrNoCaptureToStore = status.Error(codes.FailedPrecondition, "no capture from filter module")

// Collector collects data to some target.
type Collector interface {
Close()
Expand Down Expand Up @@ -186,6 +198,10 @@ func (c *collector) getAndPushNextReading() {
reading, err := c.captureFunc(c.cancelCtx, c.params)
timeReceived := timestamppb.New(c.clock.Now().UTC())
if err != nil {
if errors.Is(err, ErrNoCaptureToStore) {
c.logger.Debugln("capture filtered out by modular resource")
return
}
c.captureErrors <- errors.Wrap(err, "error while capturing data")
return
}
Expand Down Expand Up @@ -290,3 +306,12 @@ func InvalidInterfaceErr(api resource.API) error {
func FailedToReadErr(component, method string, err error) error {
return errors.Errorf("failed to get reading of method %s of component %s: %v", method, component, err)
}

// GetExtraFromContext sets the extra struct with "fromDataManagement": true if the flag is true in the context.
func GetExtraFromContext(ctx context.Context) (*structpb.Struct, error) {
extra := make(map[string]interface{})
if ctx.Value(FromDMContextKey{}) == true {
extra[FromDMString] = true
}
return protoutils.StructToStructPb(extra)
}

0 comments on commit 93a54c7

Please sign in to comment.