Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Handle PutObjectRetention event notifications in mirror #2966

Merged
merged 1 commit into from
Nov 22, 2019
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 7 additions & 0 deletions cmd/client-fs.go
Original file line number Diff line number Diff line change
Expand Up @@ -409,6 +409,13 @@ func (f *fsClient) Put(ctx context.Context, reader io.Reader, size int64, metada
return f.put(reader, size, nil, progress)
}

func (f *fsClient) PutRetention(ctx context.Context, metadata map[string]string) *probe.Error {
return probe.NewError(APINotImplemented{
API: "PutRetention",
APIType: "filesystem",
})
}

// ShareDownload - share download not implemented for filesystem.
func (f *fsClient) ShareDownload(expires time.Duration) (string, *probe.Error) {
return "", probe.NewError(APINotImplemented{
Expand Down
81 changes: 72 additions & 9 deletions cmd/client-s3.go
Original file line number Diff line number Diff line change
Expand Up @@ -652,16 +652,27 @@ func (c *s3Client) Watch(params watchParams) (*watchObject, *probe.Error) {
u := *c.targetURL
u.Path = path.Join(string(u.Separator), bucketName, key)
if strings.HasPrefix(record.EventName, "s3:ObjectCreated:") {
eventChan <- EventInfo{
Time: record.EventTime,
Size: record.S3.Object.Size,
Path: u.String(),
Type: EventCreate,
Host: record.Source.Host,
Port: record.Source.Port,
UserAgent: record.Source.UserAgent,
if strings.HasPrefix(record.EventName, "s3:ObjectCreated:PutRetention") {
eventChan <- EventInfo{
Time: record.EventTime,
Size: record.S3.Object.Size,
Path: u.String(),
Type: EventCreatePutRetention,
Host: record.Source.Host,
Port: record.Source.Port,
UserAgent: record.Source.UserAgent,
}
} else {
eventChan <- EventInfo{
Time: record.EventTime,
Size: record.S3.Object.Size,
Path: u.String(),
Type: EventCreate,
Host: record.Source.Host,
Port: record.Source.Port,
UserAgent: record.Source.UserAgent,
}
}

} else if strings.HasPrefix(record.EventName, "s3:ObjectRemoved:") {
eventChan <- EventInfo{
Time: record.EventTime,
Expand Down Expand Up @@ -884,6 +895,58 @@ func (c *s3Client) Put(ctx context.Context, reader io.Reader, size int64, metada
}
return n, nil
}
func (c *s3Client) PutRetention(ctx context.Context, metadata map[string]string) *probe.Error {
bucket, object := c.url2BucketAndObject()

if bucket == "" {
return probe.NewError(BucketNameEmpty{})
}
lockModeStr, ok := metadata[AmzObjectLockMode]
lockMode := minio.RetentionMode("")
if ok {
lockMode = minio.RetentionMode(lockModeStr)
delete(metadata, AmzObjectLockMode)
}

retainUntilDateStr, ok := metadata[AmzObjectLockRetainUntilDate]
retainUntilDate := timeSentinel
if ok {
delete(metadata, AmzObjectLockRetainUntilDate)
if t, e := time.Parse(time.RFC3339, retainUntilDateStr); e == nil {
retainUntilDate = t.UTC()
}
}
opts := minio.PutObjectRetentionOptions{Mode: &lockMode, RetainUntilDate: &retainUntilDate, GovernanceBypass: false}
e := c.api.PutObjectRetention(bucket, object, opts)
if e != nil {
errResponse := minio.ToErrorResponse(e)
if errResponse.Code == "AccessDenied" {
return probe.NewError(PathInsufficientPermission{
Path: c.targetURL.String(),
})
}
if errResponse.Code == "MethodNotAllowed" {
return probe.NewError(ObjectAlreadyExists{
Object: object,
})
}
if errResponse.Code == "NoSuchBucket" {
return probe.NewError(BucketDoesNotExist{
Bucket: bucket,
})
}
if errResponse.Code == "InvalidBucketName" {
return probe.NewError(BucketInvalid{
Bucket: bucket,
})
}
if errResponse.Code == "NoSuchKey" {
return probe.NewError(ObjectMissing{})
}
return probe.NewError(e)
}
return nil
}

// Remove incomplete uploads.
func (c *s3Client) removeIncompleteObjects(bucket string, objectsCh <-chan string) <-chan minio.RemoveObjectError {
Expand Down
3 changes: 2 additions & 1 deletion cmd/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -67,7 +67,7 @@ type Client interface {
// I/O operations with metadata.
Get(sse encrypt.ServerSide) (reader io.ReadCloser, err *probe.Error)
Put(ctx context.Context, reader io.Reader, size int64, metadata map[string]string, progress io.Reader, sse encrypt.ServerSide) (n int64, err *probe.Error)

PutRetention(ctx context.Context, metadata map[string]string) *probe.Error
// I/O operations with expiration
ShareDownload(expires time.Duration) (string, *probe.Error)
ShareUpload(bool, time.Duration, string) (string, map[string]string, *probe.Error)
Expand All @@ -93,6 +93,7 @@ type clientContent struct {
ETag string
Expires time.Time
EncryptionHeaders map[string]string
Retention bool
Err *probe.Error
}

Expand Down
28 changes: 26 additions & 2 deletions cmd/common-methods.go
Original file line number Diff line number Diff line change
Expand Up @@ -211,6 +211,18 @@ func getSourceStream(alias string, urlStr string, fetchStat bool, sse encrypt.Se
return reader, metadata, nil
}

// putTargetRetention sets retention headers if any
func putTargetRetention(ctx context.Context, alias string, urlStr string, metadata map[string]string) *probe.Error {
targetClnt, err := newClientFromAlias(alias, urlStr)
if err != nil {
return err.Trace(alias, urlStr)
}
if err := targetClnt.PutRetention(ctx, metadata); err != nil {
return err.Trace(alias, urlStr)
}
return nil
}

// putTargetStream writes to URL from Reader.
func putTargetStream(ctx context.Context, alias string, urlStr string, reader io.Reader, size int64, metadata map[string]string, progress io.Reader, sse encrypt.ServerSide) (int64, *probe.Error) {
targetClnt, err := newClientFromAlias(alias, urlStr)
Expand Down Expand Up @@ -298,7 +310,6 @@ func uploadSourceToTargetURL(ctx context.Context, urls URLs, progress io.Reader,
targetAlias := urls.TargetAlias
targetURL := urls.TargetContent.URL
length := urls.SourceContent.Size

sourcePath := filepath.ToSlash(filepath.Join(sourceAlias, urls.SourceContent.URL.Path))
targetPath := filepath.ToSlash(filepath.Join(targetAlias, urls.TargetContent.URL.Path))

Expand Down Expand Up @@ -326,10 +337,23 @@ func uploadSourceToTargetURL(ctx context.Context, urls URLs, progress io.Reader,
}

sourcePath := filepath.ToSlash(sourceURL.Path)
if urls.SourceContent.Retention {
err = putTargetRetention(ctx, targetAlias, targetURL.String(), metadata)
return urls.WithError(err.Trace(sourceURL.String()))
}
err = copySourceToTargetURL(targetAlias, targetURL.String(), sourcePath, length,
progress, srcSSE, tgtSSE, filterMetadata(metadata))
} else {

if len(metadata) == 0 {
metadata, err = getAllMetadata(sourceAlias, sourceURL.String(), srcSSE, urls)
if err != nil {
return urls.WithError(err.Trace(sourceURL.String()))
}
}
if urls.SourceContent.Retention {
err = putTargetRetention(ctx, targetAlias, targetURL.String(), metadata)
return urls.WithError(err.Trace(sourceURL.String()))
}
var reader io.ReadCloser
// Proceed with regular stream copy.
reader, metadata, err = getSourceStream(sourceAlias, sourceURL.String(), true, srcSSE)
Expand Down
13 changes: 9 additions & 4 deletions cmd/mirror-main.go
Original file line number Diff line number Diff line change
Expand Up @@ -398,12 +398,13 @@ func (mj *mirrorJob) watchMirror(ctx context.Context, cancelMirror context.Cance
srcSSE := getSSE(sourcePath, mj.encKeyDB[sourceAlias])
tgtSSE := getSSE(targetPath, mj.encKeyDB[targetAlias])

if event.Type == EventCreate {
if (event.Type == EventCreate) ||
(event.Type == EventCreatePutRetention) {
// we are checking if a destination file exists now, and if we only
// overwrite it when force is enabled.
mirrorURL := URLs{
SourceAlias: sourceAlias,
SourceContent: &clientContent{URL: *sourceURL},
SourceContent: &clientContent{URL: *sourceURL, Retention: event.Type == EventCreatePutRetention},
TargetAlias: targetAlias,
TargetContent: &clientContent{URL: *targetURL},
encKeyDB: mj.encKeyDB,
Expand All @@ -430,7 +431,7 @@ func (mj *mirrorJob) watchMirror(ctx context.Context, cancelMirror context.Cance
shouldQueue := false
if !mj.isOverwrite {
_, err = targetClient.Stat(false, false, false, tgtSSE)
if err == nil {
if err == nil || event.Type != EventCreatePutRetention {
continue
} // doesn't exist
shouldQueue = true
Expand All @@ -454,7 +455,11 @@ func (mj *mirrorJob) watchMirror(ctx context.Context, cancelMirror context.Cance
}
_, err = targetClient.Stat(false, false, false, tgtSSE)
if err == nil {
continue
if event.Type == EventCreatePutRetention {
shouldQueue = true
} else {
continue
}
} // doesn't exist
shouldQueue = true
}
Expand Down
3 changes: 2 additions & 1 deletion cmd/watch.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,8 @@ type EventType string
const (
// EventCreate notifies when a new object is created
EventCreate EventType = "ObjectCreated"
// EventCreatePutRetention notifies when a retention configuration is added to an object
EventCreatePutRetention EventType = "ObjectCreated:PutRetention"
// EventRemove notifies when a new object is deleted
EventRemove = "ObjectRemoved"
// EventAccessed notifies when an object is accessed.
Expand Down Expand Up @@ -167,7 +169,6 @@ func (w *Watcher) Join(client Client, recursive bool) *probe.Error {
if !ok {
return
}

w.eventInfoChan <- event
case err, ok := <-wo.Errors():
if !ok {
Expand Down