Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

s3 ListObject apparently provides invalid JSON? #456

Closed
abalakersky opened this issue Dec 3, 2015 · 13 comments
Closed

s3 ListObject apparently provides invalid JSON? #456

abalakersky opened this issue Dec 3, 2015 · 13 comments
Labels
guidance Question that needs advice or information.

Comments

@abalakersky
Copy link

Hello.
I am trying to pull and be able to range over a list of top level "directories" in the bucket. Using the following:

svc := s3.New(session.New(&aws.Config{
    Region:      region,
    Credentials: credentials.NewSharedCredentials("", *creds),
}))
result, err := svc.ListObjects(&s3.ListObjectsInput{Bucket: bucket, Delimiter: aws.String("/")})
if err != nil {
    log.Println("Failed to list objects", err)
    return
}
fmt.Println(result)

This prints out the following:

{
  CommonPrefixes: [
    {
      Prefix: "Dir1/"
    },
    {
      Prefix: "Dir2/"
    },
    {
      Prefix: "Dir3/"
    },
    {
      Prefix: "Dir4/"
    }
  ],
  Contents: [{
      ETag: "\"70606180e02c306e61ef7d34ce0406e3\"",
      Key: "PDF_128x128.jpg",
      LastModified: 2015-12-01 17:28:42 +0000 UTC,
      Owner: {
        DisplayName: "username",
        ID: "askldwklj3kljasdius04p02-fsd284rt"
      },
      Size: 9472,
      StorageClass: "STANDARD"
    },{
      ETag: "\"eb716e9a357f22a718cd7ddef1192033\"",
      Key: "jpg_t.jpg",
      LastModified: 2015-12-01 17:28:42 +0000 UTC,
      Owner: {
        DisplayName: "username",
        ID: "dfkljepwjkalsdhsdkljerwpoiajfsajfkld"
      },
      Size: 3361,
      StorageClass: "STANDARD"
    }],
  Delimiter: "/",
  IsTruncated: false,
  Marker: "",
  MaxKeys: 1000,
  Name: "BucketName",
  Prefix: ""
}

I need to be able to iterate separately over Prefixes in CommonPrefixes using them for further ListObjects as Prefix, and Keys in Content to be able to list those Keys. The problem is that json.Unmarshal fails and when I try to validate that json output I get invalid JSON with any validator I tried.

Am I missing something or is there another way to do something like that?

Thank you for your help.

@jasdel
Copy link
Contributor

jasdel commented Dec 3, 2015

Hi @abalakersky thanks for contacting us. The ListObject.String() method's string representation is not JSON, but Key/Value paired representation for debug output of a operation's struct.

I think the feature you're looking for of clean JSON output would be implemented by the fix for #271. If custom JSON encoding marshaller was exposed, or (Un)MarshalJSON() methods added to the shapes we could support this.

@abalakersky
Copy link
Author

Thank you for your quick reply.
So, is there any other way you could recommend to iterate through the ListObject output and use Prefix: "Dir1/", etc. as a param for new ListObjects command?

Thank you

@jasdel
Copy link
Contributor

jasdel commented Dec 3, 2015

@abalakersky is your code being used in a chain where the output needs to return a JSON object that will be parsed by another cli tool?

Also are you looking to iterate over all objects in your bucket using the prefix for additional queries? Or only iterate over select subset of prefixes?

@abalakersky
Copy link
Author

@jasdel I am using the code in GO only. I am actually looking to iterate over all objects using prefixes to split concurrency. I have run the same using Ruby to multi-thread the iteration. Was trying to migrate to GO and got stumped on this point.

@jasdel
Copy link
Contributor

jasdel commented Dec 3, 2015

Ah I see, great. There are a few ways you could approach this. If you're wanting to list objects in multiple buckets concurrently you could take an approach like our sample listS3EncryptedObjects. Where channels are used to enumerate over the objects in the buckets.

Alternatively to split based on a prefix within a bucket I suggest using a pushing the prefixes to a channel and have your parallel worker goroutines reading from that channel making ListObject's with the provided prefixes. This would look something like the following:

package main

import (
    "fmt"
    "sync"

    "github.com/aws/aws-sdk-go/aws/session"
    "github.com/aws/aws-sdk-go/service/s3"
    "github.com/aws/aws-sdk-go/service/s3/s3iface"
)

func main() {
    svc := s3.New(session.New())

    bucket := "mybucket"
    numWorkers := 5

    prefixCh := make(chan string, numWorkers)
    objCh := make(chan *s3.Object, 100)
    var wg sync.WaitGroup
    for i := 0; i < numWorkers; i++ {
        // Spin up each worker
        wg.Add(1)
        go func() {
            listObjectsWorker(objCh, prefixCh, bucket, svc)
            wg.Done()
        }()
    }
    go func() {
        // TODO ListObjects in bucket, and use CommonPrefixes
        // Add each prefix to prefixCh and close it so workers will
        // know when to stop
        close(prefixCh)
    }()
    go func() {
        // Wait until workers are finished then close object channel
        wg.Wait()
        close(objCh)
    }()

    for obj := range objCh {
        // TODO Do something with the objects.
        fmt.Println("Object:", *obj.Key)
    }
}

func listObjectsWorker(objCh chan<- *s3.Object, prefixCh <-chan string, bucket string, svc s3iface.S3API) {
    for prefix := range prefixCh {
        // TODO ListObjects
        // TODO Push each Object Contents to the objCh
    }
}

@abalakersky
Copy link
Author

@jasdel Thank you very much. I am just starting with Go so this is great help. One thing through I am still struggling with, and that's where my original question led me to, is how to get Prefixes separated from CommonPrefixes so they can be pushed to the prefixCh? I was trying to Unmarshal results from my original ListObjects but that did not want to work. So that's where I got stuck. With a seeming no way to get prefixes separated sufficiently to be usable for a followup command.

@jasdel
Copy link
Contributor

jasdel commented Dec 3, 2015

Ah i see, i think you just need to iterate over the CommonPrefixes slice and Prefix is a field off of that.

for _, commonPrefix := range result.CommonPrefixes {
    fmt.Println("Prefix:", *commonPrefix.Prefix)
}

@jasdel
Copy link
Contributor

jasdel commented Dec 3, 2015

Here is a full example:

package main

import (
    "fmt"
    "sync"

    "github.com/aws/aws-sdk-go/aws"
    "github.com/aws/aws-sdk-go/aws/session"
    "github.com/aws/aws-sdk-go/service/s3"
    "github.com/aws/aws-sdk-go/service/s3/s3iface"
)

func main() {
    svc := s3.New(session.New())

    bucket := "mybucket"
    numWorkers := 5

    prefixCh := make(chan string, numWorkers)
    objCh := make(chan *s3.Object, 100)
    var wg sync.WaitGroup
    for i := 0; i < numWorkers; i++ {
        // Spin up each worker
        wg.Add(1)
        go func() {
            listObjectsWorker(objCh, prefixCh, bucket, svc)
            wg.Done()
        }()
    }
    go func() {
        // Wait until workers are finished then close object channel
        wg.Wait()
        close(objCh)
    }()

    go func() {
        if err := getBucketCommonPrefixes(prefixCh, bucket, svc); err != nil {
            fmt.Println("error getting bucket common prefixes", err)
        }
        // Close prefixCh so workers will know when to stop
        close(prefixCh)
    }()

    for obj := range objCh {
        // TODO Do something with the objects.
        fmt.Println("Object:", *obj.Key)
    }
}

func listObjectsWorker(objCh chan<- *s3.Object, prefixCh <-chan string, bucket string, svc s3iface.S3API) {
    for prefix := range prefixCh {
        result, err := svc.ListObjects(&s3.ListObjectsInput{
            Bucket: &bucket, Delimiter: aws.String("/"),
            Prefix: &prefix,
        })
        if err != nil {
            fmt.Println("failed to list objects by prefix", prefix, err)
            continue
        }
        for _, obj := range result.Contents {
            objCh <- obj
        }
    }
}

func getBucketCommonPrefixes(prefixCh chan<- string, bucket string, svc s3iface.S3API) error {
    result, err := svc.ListObjects(&s3.ListObjectsInput{
        Bucket: &bucket, Delimiter: aws.String("/"),
    })
    if err != nil {
        return err
    }

    for _, commonPrefix := range result.CommonPrefixes {
        prefixCh <- *commonPrefix.Prefix
    }

    return nil
}

@abalakersky
Copy link
Author

Ah, this is fantastic. Thank you for all your help. I definitely got ways to go with go :) but I love it so far.

@jasdel jasdel added the guidance Question that needs advice or information. label Dec 3, 2015
@jasdel
Copy link
Contributor

jasdel commented Dec 4, 2015

Glad to help @abalakersky let us know if you have any additional questions, or is there is anything we can help you with. I'll add this code example to our section of example to make it available for others looking how to use the SDK concurrently.

@jasdel jasdel closed this as completed Dec 4, 2015
@abalakersky
Copy link
Author

@jasdel Thank you again for all your help. Here is today's final version 😃 that I came up with. Please see if that would be useful. I also incorporate the option to save a manifest and basic case insensitive search. BTW, if you do know of a better way to do both I would greatly appreciate any advice.

package main

import (
    "time"
    "strconv"
    "flag"
    "fmt"
    "github.com/aws/aws-sdk-go/aws"
    "github.com/aws/aws-sdk-go/aws/credentials"
    "github.com/aws/aws-sdk-go/aws/session"
    "github.com/aws/aws-sdk-go/service/s3"
    "sync"
    "os"
    "log"
    "path/filepath"
    "github.com/aws/aws-sdk-go/service/s3/s3iface"
    "bufio"
    "strings"
)

var (
    bucket = flag.String("bucket", "", "Bucket Name to list objects from. REQUIRED")
    region = flag.String("region", "us-east-1", "Region to connect to.")
    creds = flag.String("creds", "default", "Credentials Profile to use")
    search = flag.String("search", "", "Search string to find in object paths")
    t = time.Now()
    dir, _ = filepath.Abs(filepath.Dir(os.Args[0]))
    name = dir + "/" + *bucket + "_" + *search + strconv.FormatInt(t.Unix(), 10) + ".log"
)


func listObjectsWorker(objCh chan <- *s3.Object, prefix string, bucket *string, svc s3iface.S3API) {
    params := &s3.ListObjectsInput{
        Bucket: bucket,
        Prefix: &prefix,
    }
    err := svc.ListObjectsPages(params,
        func(page *s3.ListObjectsOutput, last bool) bool {
            for _, object := range page.Contents {
                objCh <- object
            }
            return true
        },
    )

    if err != nil {
        fmt.Println("failed to list objects by prefix", prefix, err)
    }
}

func CaseInsesitiveContains (s, substr string) bool {
    s, substr = strings.ToUpper(s), strings.ToUpper(substr)
    return strings.Contains(s, substr)
}

func main() {
    flag.Parse()
    svc := s3.New(session.New(&aws.Config{
        Region:      region,
        Credentials: credentials.NewSharedCredentials("", *creds),
    }))

    if *bucket == "" {
        fmt.Printf("\n%s\n\n", "You Need to specify name of the Bucket to scan")
        return
    }

    f, err := os.Create(name)
    if err != nil {
        panic(err)
    }
    defer f.Close()

    topLevel, err := svc.ListObjects(&s3.ListObjectsInput{Bucket: bucket, Delimiter: aws.String("/")})
    if err != nil {
        log.Println("Failed to list Top Level objects", err)
        return
    }
    for _, contentKeys := range topLevel.Contents {
        fmt.Println(*contentKeys.Key)
    }

    objCh := make(chan *s3.Object, 10)
    var wg sync.WaitGroup

    for _, commonPrefix := range topLevel.CommonPrefixes {
//      fmt.Println(commonPrefix.Prefix)
        wg.Add(1)
        go func() {
            defer wg.Done()
            listObjectsWorker(objCh, *commonPrefix.Prefix, bucket, svc)
        }()
        go func() {
            wg.Wait()
            close(objCh)
        }()
    }
//  for obj := range objCh {
//      fmt.Println(*obj.Key)
//  }
    w := bufio.NewWriter(f)
    for obj := range objCh {
        switch  {
        case *search == "" :
            fmt.Println(*obj.Key)
            w.WriteString(*obj.Key + "\n")
        case *search != "" :
            if CaseInsesitiveContains(*obj.Key, *search) == true {
                fmt.Println(*obj.Key)
                w.WriteString(*obj.Key + "\n")
            } else {
                continue
            }
        }
    }
    w.Flush()
}

@jasdel
Copy link
Contributor

jasdel commented Dec 4, 2015

@abalakersky overall it looks OK. Here are a couple comments though. Were you able to resolve the pausing your code was running into? Based on your description it sounds like your code is being throttled by S3 for so many requests at once.

  • I'd suggest running your code with the -race flag to detect if there are any race conditions.
  • In the obj for loop your w.WriteString(*obj.Key + "\n") can be replaced with fmt.Fprintln(w, *obj.Key) and it will save you a few allocations and performance since it doesn't need to create the temporary strings.
  • The for _, commonPrefix := range topLevel.CommonPrefixes might be producing unexpected prefixes. Since the evaluation of commonPrefix doesn't occur until after goroutine starts, and a for loop's range reuses values it returns. https://github.com/golang/go/wiki/CommonMistakes#using-goroutines-on-loop-iterator-variables

@abalakersky
Copy link
Author

@jasdel Thank you again for your help. I've done the changes you mentioned and also run with -race. Found couple of race conditions as well as close on closed channel. Took me a while to figure out the close on closed, but Dave Cheney's article helped http://dave.cheney.net/2014/03/19/channel-axioms
I think I found everything and testing so far is successful on Linux and Windows.
I would really appreciate your opinion on the script, if you have few minutes.
Thank you again.

package main

import (
    "bufio"
    "flag"
    "fmt"
    "github.com/aws/aws-sdk-go/aws"
    "github.com/aws/aws-sdk-go/aws/credentials"
    "github.com/aws/aws-sdk-go/aws/session"
    "github.com/aws/aws-sdk-go/service/s3"
    "github.com/aws/aws-sdk-go/service/s3/s3iface"
    "log"
    "os"
    "path/filepath"
    "strconv"
    "strings"
    "sync"
    "time"
)

var (
    bucket = flag.String("bucket", "", "Bucket Name to list objects from. REQUIRED")
    region = flag.String("region", "us-east-1", "Region to connect to.")
    creds  = flag.String("creds", "default", "Credentials Profile to use")
    search = flag.String("search", "", "Search string to find in object paths")
    t      = time.Now()
    dir, _ = filepath.Abs(filepath.Dir(os.Args[0]))
)

func caseInsesitiveContains(s, substr string) bool {
    s, substr = strings.ToUpper(s), strings.ToUpper(substr)
    return strings.Contains(s, substr)
}

func main() {
    flag.Parse()
    svc := s3.New(session.New(&aws.Config{
        Region:      region,
        Credentials: credentials.NewSharedCredentials("", *creds),
    }))

    if *bucket == "" {
        fmt.Printf("\n%s\n\n", "You Need to specify name of the Bucket to scan")
        return
    }
    var s string
    if *search != "" {
        s = *search
    }

    name := dir + "/" + *bucket + "_" + s + "_" + strconv.FormatInt(t.Unix(), 10) + ".log"

    f, err := os.Create(name)
    if err != nil {
        panic(err)
    }
    defer f.Close()
    w := bufio.NewWriter(f)

    topLevel, err := svc.ListObjects(&s3.ListObjectsInput{Bucket: bucket, Delimiter: aws.String("/")})
    if err != nil {
        log.Println("Failed to list Top Level objects", err)
        return
    }
    for _, contentKeys := range topLevel.Contents {
        switch {
        case *search == "":
            fmt.Fprintln(w, *contentKeys.Key)
        case *search != "":
            if caseInsesitiveContains(*contentKeys.Key, *search) == true {
                fmt.Fprintln(w, *contentKeys.Key)
            } else {
                continue
            }
        }
    }

    var prefixes []string
    for _, commonPrefix := range topLevel.CommonPrefixes {
        prefixes = append(prefixes, *commonPrefix.Prefix)
    }

    objCh := make(chan *s3.Object, 10)
    var wg sync.WaitGroup

    listObjectsWorker := func(objCh chan<- *s3.Object, prefix string, bucket *string, svc s3iface.S3API) {
        params := &s3.ListObjectsInput{
            Bucket: bucket,
            Prefix: &prefix,
        }
        err := svc.ListObjectsPages(params,
            func(page *s3.ListObjectsOutput, last bool) bool {
                for _, object := range page.Contents {
                    objCh <- object
                    //              objCh <- fmt.Sprintf("%s", *object.Key)
                }
                return true
            },
        )

        if err != nil {
            fmt.Println("failed to list objects by prefix", prefix, err)
        }
        wg.Done()
    }

    wg.Add(len(prefixes))

    for i := range prefixes {
        prefix := prefixes[i]
        go listObjectsWorker(objCh, prefix, bucket, svc)
    }

    go func() {
        wg.Wait()
        close(objCh)
    }()

    for obj := range objCh {
        switch {
        case *search == "":
            fmt.Fprintln(w, *obj.Key)
            //              fmt.Println(*obj.Key)
        case *search != "":
            if caseInsesitiveContains(*obj.Key, *search) == true {
                fmt.Fprintln(w, *obj.Key)
                //              fmt.Println(*obj.Key)
            } else {
                continue
            }
        }
    }
    w.Flush()
}

skotambkar added a commit to skotambkar/aws-sdk-go that referenced this issue May 20, 2021
* Release v0.18.0 (2019-12-12)
===

Services
---
* Synced the V2 SDK with latest AWS service API definitions.

SDK Bugs
---
* `aws/endpoints`: aws/endpoints: Fix SDK resolving endpoint without region ([aws#420](aws/aws-sdk-go-v2#420))
  * Fixes the SDK's endpoint resolve incorrectly resolving endpoints for a service when the region is empty. Also fixes the SDK attempting to resolve a service when the service value is empty.
  * Related to [aws#2909](aws#2909)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
guidance Question that needs advice or information.
Projects
None yet
Development

No branches or pull requests

2 participants