Skip to content

Commit

Permalink
RSDK-3795 - Switch from deprecated file IDs to binary IDs (#2594)
Browse files Browse the repository at this point in the history
  • Loading branch information
agreenb authored Jun 29, 2023
1 parent 5518c51 commit 3c2b24c
Show file tree
Hide file tree
Showing 3 changed files with 40 additions and 16 deletions.
23 changes: 14 additions & 9 deletions cli/data.go
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,7 @@ func (c *AppClient) BinaryData(dst string, filter *datapb.Filter, parallelDownlo
parallelDownloads = defaultParallelDownloads
}

ids := make(chan string, parallelDownloads)
ids := make(chan *datapb.BinaryID, parallelDownloads)
// Give channel buffer of 1+parallelDownloads because that is the number of goroutines that may be passing an
// error into this channel (1 get ids routine + parallelDownloads download routines).
errs := make(chan error, 1+parallelDownloads)
Expand Down Expand Up @@ -73,7 +73,7 @@ func (c *AppClient) BinaryData(dst string, filter *datapb.Filter, parallelDownlo
wg.Add(1)
go func() {
defer wg.Done()
var nextID string
var nextID *datapb.BinaryID
var done bool
var numFilesDownloaded atomic.Int32
var downloadWG sync.WaitGroup
Expand All @@ -88,14 +88,14 @@ func (c *AppClient) BinaryData(dst string, filter *datapb.Filter, parallelDownlo

nextID = <-ids

// If nextID is zero value, the channel has been closed and there are no more IDs to be read.
if nextID == "" {
// If nextID is nil, the channel has been closed and there are no more IDs to be read.
if nextID == nil {
done = true
break
}

downloadWG.Add(1)
go func(id string) {
go func(id *datapb.BinaryID) {
defer downloadWG.Done()
err := downloadBinary(ctx, c.dataClient, dst, id)
if err != nil {
Expand Down Expand Up @@ -130,7 +130,7 @@ func (c *AppClient) BinaryData(dst string, filter *datapb.Filter, parallelDownlo

// getMatchingIDs queries client for all BinaryData matching filter, and passes each of their ids into ids.
func getMatchingBinaryIDs(ctx context.Context, client datapb.DataServiceClient, filter *datapb.Filter,
ids chan string, limit uint,
ids chan *datapb.BinaryID, limit uint,
) error {
var last string
defer close(ids)
Expand Down Expand Up @@ -158,17 +158,22 @@ func getMatchingBinaryIDs(ctx context.Context, client datapb.DataServiceClient,
last = resp.GetLast()

for _, bd := range resp.GetData() {
ids <- bd.GetMetadata().GetId()
md := bd.GetMetadata()
ids <- &datapb.BinaryID{
FileId: md.GetId(),
OrganizationId: md.GetCaptureMetadata().GetOrgId(),
LocationId: md.GetCaptureMetadata().GetLocationId(),
}
}
}
}

func downloadBinary(ctx context.Context, client datapb.DataServiceClient, dst, id string) error {
func downloadBinary(ctx context.Context, client datapb.DataServiceClient, dst string, id *datapb.BinaryID) error {
var resp *datapb.BinaryDataByIDsResponse
var err error
for count := 0; count < maxRetryCount; count++ {
resp, err = client.BinaryDataByIDs(ctx, &datapb.BinaryDataByIDsRequest{
FileIds: []string{id},
BinaryIds: []*datapb.BinaryID{id},
IncludeBinary: true,
})
if err == nil {
Expand Down
11 changes: 8 additions & 3 deletions components/camera/replaypcd/replaypcd.go
Original file line number Diff line number Diff line change
Expand Up @@ -62,7 +62,7 @@ type TimeInterval struct {
// cacheEntry stores data that was downloaded from a previous operation but has not yet been passed
// to the caller.
type cacheEntry struct {
id string
id *datapb.BinaryID
pc pointcloud.PointCloud
timeRequested *timestamppb.Timestamp
timeReceived *timestamppb.Timestamp
Expand Down Expand Up @@ -200,7 +200,12 @@ func (replay *pcdCamera) NextPointCloud(ctx context.Context) (pointcloud.PointCl
// data in parallel and cache the results
replay.cache = make([]*cacheEntry, len(resp.Data))
for i, dataResponse := range resp.Data {
replay.cache[i] = &cacheEntry{id: dataResponse.GetMetadata().Id}
md := dataResponse.GetMetadata()
replay.cache[i] = &cacheEntry{id: &datapb.BinaryID{
FileId: md.GetId(),
OrganizationId: md.GetCaptureMetadata().GetOrgId(),
LocationId: md.GetCaptureMetadata().GetLocationId(),
}}
}

ctxTimeout, cancelTimeout := context.WithTimeout(ctx, downloadTimeout)
Expand All @@ -227,7 +232,7 @@ func (replay *pcdCamera) downloadBatch(ctx context.Context) {

var resp *datapb.BinaryDataByIDsResponse
resp, data.err = replay.dataClient.BinaryDataByIDs(ctx, &datapb.BinaryDataByIDsRequest{
FileIds: []string{data.id},
BinaryIds: []*datapb.BinaryID{data.id},
IncludeBinary: true,
})
if data.err != nil {
Expand Down
22 changes: 18 additions & 4 deletions components/camera/replaypcd/replaypcd_utils_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,11 @@ import (
"go.viam.com/rdk/testutils/inject"
)

const testTime = "2000-01-01T12:00:%02dZ"
const (
testTime = "2000-01-01T12:00:%02dZ"
orgID = "slam_org_id"
locationID = "slam_location_id"
)

// mockDataServiceServer is a struct that includes unimplemented versions of all the Data Service endpoints. These
// can be overwritten to allow developers to trigger desired behaviors during testing.
Expand All @@ -46,9 +50,7 @@ type mockDataServiceServer struct {
func (mDServer *mockDataServiceServer) BinaryDataByIDs(ctx context.Context, req *datapb.BinaryDataByIDsRequest,
) (*datapb.BinaryDataByIDsResponse, error) {
// Parse request
// TODO(RSDK-3795): Update BinaryDataByIDs test to match newest API
//nolint: staticcheck
fileID := req.FileIds[0]
fileID := req.BinaryIds[0].GetFileId()

data, err := getCompressedBytesFromArtifact(fileID)
if err != nil {
Expand All @@ -71,6 +73,10 @@ func (mDServer *mockDataServiceServer) BinaryDataByIDs(ctx context.Context, req
Id: fileID,
TimeRequested: timeReq,
TimeReceived: timeRec,
CaptureMetadata: &datapb.CaptureMetadata{
OrgId: orgID,
LocationId: locationID,
},
},
}
resp := &datapb.BinaryDataByIDsResponse{
Expand Down Expand Up @@ -113,6 +119,10 @@ func (mDServer *mockDataServiceServer) BinaryDataByFilter(ctx context.Context, r
Id: fmt.Sprintf(datasetDirectory, newFileNum),
TimeRequested: timeReq,
TimeReceived: timeRec,
CaptureMetadata: &datapb.CaptureMetadata{
OrgId: orgID,
LocationId: locationID,
},
},
}

Expand All @@ -133,6 +143,10 @@ func (mDServer *mockDataServiceServer) BinaryDataByFilter(ctx context.Context, r
Id: fmt.Sprintf(datasetDirectory, newFileNum+i),
TimeRequested: timeReq,
TimeReceived: timeRec,
CaptureMetadata: &datapb.CaptureMetadata{
OrgId: orgID,
LocationId: locationID,
},
},
}
resp.Data = append(resp.Data, &binaryData)
Expand Down

0 comments on commit 3c2b24c

Please sign in to comment.