From 18ad0d06ae3d5e6832b92faebafdcdbc9714140e Mon Sep 17 00:00:00 2001 From: Jeff Mendoza Date: Tue, 5 Dec 2023 10:15:15 -0800 Subject: [PATCH] Change Keys method in kv interface to Scan (#1558) Keys was a placeholder, as an external keyvalue may have more keys than we want to read at once. Additionally we don't want to overload the external store with an operation that will block for a long time. Now the Scan interface will only get some number of keys at a time. Implementations for Redis and TiKV have been added. Signed-off-by: Jeff Mendoza --- internal/testing/stablememmap/stablememmap.go | 28 ++++++++--- pkg/assembler/backends/keyvalue/artifact.go | 43 +++++++++------- pkg/assembler/backends/keyvalue/builder.go | 19 ++++--- pkg/assembler/backends/keyvalue/certifyBad.go | 24 +++++---- .../backends/keyvalue/certifyGood.go | 24 +++++---- .../backends/keyvalue/certifyLegal.go | 24 +++++---- .../backends/keyvalue/certifyScorecard.go | 24 +++++---- .../backends/keyvalue/certifyVEXStatement.go | 24 +++++---- .../backends/keyvalue/certifyVuln.go | 24 +++++---- .../backends/keyvalue/hasMetadata.go | 24 +++++---- pkg/assembler/backends/keyvalue/hasSBOM.go | 24 +++++---- pkg/assembler/backends/keyvalue/hasSLSA.go | 24 +++++---- .../backends/keyvalue/hasSourceAt.go | 24 +++++---- pkg/assembler/backends/keyvalue/hashEqual.go | 24 +++++---- .../backends/keyvalue/isDependency.go | 24 +++++---- .../backends/keyvalue/isOccurrence.go | 24 +++++---- pkg/assembler/backends/keyvalue/license.go | 27 ++++++---- pkg/assembler/backends/keyvalue/pkg.go | 32 +++++++----- pkg/assembler/backends/keyvalue/pkgEqual.go | 24 +++++---- .../backends/keyvalue/pointOfContact.go | 24 +++++---- pkg/assembler/backends/keyvalue/src.go | 32 +++++++----- pkg/assembler/backends/keyvalue/vulnEqual.go | 24 +++++---- .../backends/keyvalue/vulnMetadata.go | 24 +++++---- .../backends/keyvalue/vulnerability.go | 32 +++++++----- pkg/assembler/kv/kv.go | 17 +++++-- pkg/assembler/kv/memmap/memmap.go | 35 ++++++++++--- pkg/assembler/kv/redis/redis.go | 42 +++++++++++++--- pkg/assembler/kv/tikv/tikv.go | 50 +++++++++++++++---- 28 files changed, 500 insertions(+), 265 deletions(-) diff --git a/internal/testing/stablememmap/stablememmap.go b/internal/testing/stablememmap/stablememmap.go index b3287e0dc4..08b747b13f 100644 --- a/internal/testing/stablememmap/stablememmap.go +++ b/internal/testing/stablememmap/stablememmap.go @@ -17,35 +17,47 @@ package stablememmap import ( "context" + "errors" "slices" "github.com/guacsec/guac/pkg/assembler/kv" "github.com/guacsec/guac/pkg/assembler/kv/memmap" ) -type Store struct { +type store struct { mm kv.Store } func GetStore() kv.Store { - return &Store{ + return &store{ mm: memmap.GetStore(), } } -func (s *Store) Get(ctx context.Context, c, k string, v any) error { +func (s *store) Get(ctx context.Context, c, k string, v any) error { return s.mm.Get(ctx, c, k, v) } -func (s *Store) Set(ctx context.Context, c, k string, v any) error { +func (s *store) Set(ctx context.Context, c, k string, v any) error { return s.mm.Set(ctx, c, k, v) } -func (s *Store) Keys(ctx context.Context, c string) ([]string, error) { - keys, err := s.mm.Keys(ctx, c) +func (s *store) Keys(c string) kv.Scanner { + return &scanner{mms: s.mm.Keys(c)} +} + +type scanner struct { + mms kv.Scanner +} + +func (s *scanner) Scan(ctx context.Context) ([]string, bool, error) { + keys, done, err := s.mms.Scan(ctx) if err != nil { - return nil, err + return nil, false, err + } + if !done { + return nil, false, errors.New("Expect memmap to always return all keys at once") } slices.Sort(keys) - return keys, nil + return keys, true, nil } diff --git a/pkg/assembler/backends/keyvalue/artifact.go b/pkg/assembler/backends/keyvalue/artifact.go index f92f078b64..2b9d214482 100644 --- a/pkg/assembler/backends/keyvalue/artifact.go +++ b/pkg/assembler/backends/keyvalue/artifact.go @@ -239,28 +239,33 @@ func (c *demoClient) Artifacts(ctx context.Context, artifactSpec *model.Artifact algorithm := strings.ToLower(nilToEmpty(artifactSpec.Algorithm)) digest := strings.ToLower(nilToEmpty(artifactSpec.Digest)) var rv []*model.Artifact - artKeys, err := c.kv.Keys(ctx, artCol) - if err != nil { - return nil, err - } - for _, ak := range artKeys { - a, err := byKeykv[*artStruct](ctx, artCol, ak, c) + var done bool + scn := c.kv.Keys(artCol) + for !done { + var artKeys []string + artKeys, done, err = scn.Scan(ctx) if err != nil { return nil, err } - - matchAlgorithm := false - if algorithm == "" || algorithm == a.Algorithm { - matchAlgorithm = true - } - - matchDigest := false - if digest == "" || digest == a.Digest { - matchDigest = true - } - - if matchDigest && matchAlgorithm { - rv = append(rv, c.convArtifact(a)) + for _, ak := range artKeys { + a, err := byKeykv[*artStruct](ctx, artCol, ak, c) + if err != nil { + return nil, err + } + + matchAlgorithm := false + if algorithm == "" || algorithm == a.Algorithm { + matchAlgorithm = true + } + + matchDigest := false + if digest == "" || digest == a.Digest { + matchDigest = true + } + + if matchDigest && matchAlgorithm { + rv = append(rv, c.convArtifact(a)) + } } } return rv, nil diff --git a/pkg/assembler/backends/keyvalue/builder.go b/pkg/assembler/backends/keyvalue/builder.go index 672e47cb9e..69c5cf0044 100644 --- a/pkg/assembler/backends/keyvalue/builder.go +++ b/pkg/assembler/backends/keyvalue/builder.go @@ -124,16 +124,21 @@ func (c *demoClient) Builders(ctx context.Context, builderSpec *model.BuilderSpe return []*model.Builder{c.convBuilder(b)}, nil } var builders []*model.Builder - bKeys, err := c.kv.Keys(ctx, builderCol) - if err != nil { - return nil, err - } - for _, bk := range bKeys { - b, err := byKeykv[*builderStruct](ctx, builderCol, bk, c) + var done bool + scn := c.kv.Keys(builderCol) + for !done { + var bKeys []string + bKeys, done, err = scn.Scan(ctx) if err != nil { return nil, err } - builders = append(builders, c.convBuilder(b)) + for _, bk := range bKeys { + b, err := byKeykv[*builderStruct](ctx, builderCol, bk, c) + if err != nil { + return nil, err + } + builders = append(builders, c.convBuilder(b)) + } } return builders, nil } diff --git a/pkg/assembler/backends/keyvalue/certifyBad.go b/pkg/assembler/backends/keyvalue/certifyBad.go index a3698041f2..95c1df1347 100644 --- a/pkg/assembler/backends/keyvalue/certifyBad.go +++ b/pkg/assembler/backends/keyvalue/certifyBad.go @@ -244,18 +244,24 @@ func (c *demoClient) CertifyBad(ctx context.Context, filter *model.CertifyBadSpe } } } else { - cgKeys, err := c.kv.Keys(ctx, cbCol) - if err != nil { - return nil, err - } - for _, cgk := range cgKeys { - link, err := byKeykv[*badLink](ctx, cbCol, cgk, c) + var done bool + scn := c.kv.Keys(cbCol) + for !done { + var cgKeys []string + var err error + cgKeys, done, err = scn.Scan(ctx) if err != nil { return nil, err } - out, err = c.addCBIfMatch(ctx, out, filter, link) - if err != nil { - return nil, gqlerror.Errorf("%v :: %v", funcName, err) + for _, cgk := range cgKeys { + link, err := byKeykv[*badLink](ctx, cbCol, cgk, c) + if err != nil { + return nil, err + } + out, err = c.addCBIfMatch(ctx, out, filter, link) + if err != nil { + return nil, gqlerror.Errorf("%v :: %v", funcName, err) + } } } } diff --git a/pkg/assembler/backends/keyvalue/certifyGood.go b/pkg/assembler/backends/keyvalue/certifyGood.go index be46a11509..9ec9a9e5c6 100644 --- a/pkg/assembler/backends/keyvalue/certifyGood.go +++ b/pkg/assembler/backends/keyvalue/certifyGood.go @@ -243,18 +243,24 @@ func (c *demoClient) CertifyGood(ctx context.Context, filter *model.CertifyGoodS } } } else { - cgKeys, err := c.kv.Keys(ctx, cgCol) - if err != nil { - return nil, err - } - for _, cgk := range cgKeys { - link, err := byKeykv[*goodLink](ctx, cgCol, cgk, c) + var done bool + scn := c.kv.Keys(cgCol) + for !done { + var cgKeys []string + var err error + cgKeys, done, err = scn.Scan(ctx) if err != nil { return nil, err } - out, err = c.addCGIfMatch(ctx, out, filter, link) - if err != nil { - return nil, gqlerror.Errorf("%v :: %v", funcName, err) + for _, cgk := range cgKeys { + link, err := byKeykv[*goodLink](ctx, cgCol, cgk, c) + if err != nil { + return nil, err + } + out, err = c.addCGIfMatch(ctx, out, filter, link) + if err != nil { + return nil, gqlerror.Errorf("%v :: %v", funcName, err) + } } } } diff --git a/pkg/assembler/backends/keyvalue/certifyLegal.go b/pkg/assembler/backends/keyvalue/certifyLegal.go index 7a73668604..d6eba915af 100644 --- a/pkg/assembler/backends/keyvalue/certifyLegal.go +++ b/pkg/assembler/backends/keyvalue/certifyLegal.go @@ -343,18 +343,24 @@ func (c *demoClient) CertifyLegal(ctx context.Context, filter *model.CertifyLega } } } else { - clKeys, err := c.kv.Keys(ctx, clCol) - if err != nil { - return nil, err - } - for _, clk := range clKeys { - link, err := byKeykv[*certifyLegalStruct](ctx, clCol, clk, c) + var done bool + scn := c.kv.Keys(clCol) + for !done { + var clKeys []string + var err error + clKeys, done, err = scn.Scan(ctx) if err != nil { return nil, err } - out, err = c.addLegalIfMatch(ctx, out, filter, link) - if err != nil { - return nil, gqlerror.Errorf("%v :: %v", funcName, err) + for _, clk := range clKeys { + link, err := byKeykv[*certifyLegalStruct](ctx, clCol, clk, c) + if err != nil { + return nil, err + } + out, err = c.addLegalIfMatch(ctx, out, filter, link) + if err != nil { + return nil, gqlerror.Errorf("%v :: %v", funcName, err) + } } } } diff --git a/pkg/assembler/backends/keyvalue/certifyScorecard.go b/pkg/assembler/backends/keyvalue/certifyScorecard.go index ebfa36d74e..1c2e52a65d 100644 --- a/pkg/assembler/backends/keyvalue/certifyScorecard.go +++ b/pkg/assembler/backends/keyvalue/certifyScorecard.go @@ -183,18 +183,24 @@ func (c *demoClient) Scorecards(ctx context.Context, filter *model.CertifyScorec } } } else { - cscKeys, err := c.kv.Keys(ctx, cscCol) - if err != nil { - return nil, err - } - for _, csck := range cscKeys { - link, err := byKeykv[*scorecardLink](ctx, cscCol, csck, c) + var done bool + scn := c.kv.Keys(cscCol) + for !done { + var cscKeys []string + var err error + cscKeys, done, err = scn.Scan(ctx) if err != nil { return nil, err } - out, err = c.addSCIfMatch(ctx, out, filter, link) - if err != nil { - return nil, gqlerror.Errorf("%v :: %v", funcName, err) + for _, csck := range cscKeys { + link, err := byKeykv[*scorecardLink](ctx, cscCol, csck, c) + if err != nil { + return nil, err + } + out, err = c.addSCIfMatch(ctx, out, filter, link) + if err != nil { + return nil, gqlerror.Errorf("%v :: %v", funcName, err) + } } } } diff --git a/pkg/assembler/backends/keyvalue/certifyVEXStatement.go b/pkg/assembler/backends/keyvalue/certifyVEXStatement.go index 98576562b7..6a35329d46 100644 --- a/pkg/assembler/backends/keyvalue/certifyVEXStatement.go +++ b/pkg/assembler/backends/keyvalue/certifyVEXStatement.go @@ -254,18 +254,24 @@ func (c *demoClient) CertifyVEXStatement(ctx context.Context, filter *model.Cert } } } else { - keys, err := c.kv.Keys(ctx, cVEXCol) - if err != nil { - return nil, err - } - for _, key := range keys { - link, err := byKeykv[*vexLink](ctx, cVEXCol, key, c) + var done bool + scn := c.kv.Keys(cVEXCol) + for !done { + var keys []string + var err error + keys, done, err = scn.Scan(ctx) if err != nil { return nil, err } - out, err = c.addVexIfMatch(ctx, out, filter, link) - if err != nil { - return nil, gqlerror.Errorf("%v :: %v", funcName, err) + for _, key := range keys { + link, err := byKeykv[*vexLink](ctx, cVEXCol, key, c) + if err != nil { + return nil, err + } + out, err = c.addVexIfMatch(ctx, out, filter, link) + if err != nil { + return nil, gqlerror.Errorf("%v :: %v", funcName, err) + } } } } diff --git a/pkg/assembler/backends/keyvalue/certifyVuln.go b/pkg/assembler/backends/keyvalue/certifyVuln.go index c684cbb83a..0deb1e24be 100644 --- a/pkg/assembler/backends/keyvalue/certifyVuln.go +++ b/pkg/assembler/backends/keyvalue/certifyVuln.go @@ -227,18 +227,24 @@ func (c *demoClient) CertifyVuln(ctx context.Context, filter *model.CertifyVulnS } } } else { - keys, err := c.kv.Keys(ctx, cVulnCol) - if err != nil { - return nil, err - } - for _, key := range keys { - link, err := byKeykv[*certifyVulnerabilityLink](ctx, cVulnCol, key, c) + var done bool + scn := c.kv.Keys(cVulnCol) + for !done { + var keys []string + var err error + keys, done, err = scn.Scan(ctx) if err != nil { return nil, err } - out, err = c.addCVIfMatch(ctx, out, filter, link) - if err != nil { - return nil, gqlerror.Errorf("%v :: %v", funcName, err) + for _, key := range keys { + link, err := byKeykv[*certifyVulnerabilityLink](ctx, cVulnCol, key, c) + if err != nil { + return nil, err + } + out, err = c.addCVIfMatch(ctx, out, filter, link) + if err != nil { + return nil, gqlerror.Errorf("%v :: %v", funcName, err) + } } } } diff --git a/pkg/assembler/backends/keyvalue/hasMetadata.go b/pkg/assembler/backends/keyvalue/hasMetadata.go index dcaf3e5b23..2d20277e4c 100644 --- a/pkg/assembler/backends/keyvalue/hasMetadata.go +++ b/pkg/assembler/backends/keyvalue/hasMetadata.go @@ -253,18 +253,24 @@ func (c *demoClient) HasMetadata(ctx context.Context, filter *model.HasMetadataS } } } else { - hmk, err := c.kv.Keys(ctx, hasMDCol) - if err != nil { - return nil, err - } - for _, hk := range hmk { - link, err := byKeykv[*hasMetadataLink](ctx, hasMDCol, hk, c) + var done bool + scn := c.kv.Keys(hasMDCol) + for !done { + var hmk []string + var err error + hmk, done, err = scn.Scan(ctx) if err != nil { return nil, err } - out, err = c.addHMIfMatch(ctx, out, filter, link) - if err != nil { - return nil, gqlerror.Errorf("%v :: %v", funcName, err) + for _, hk := range hmk { + link, err := byKeykv[*hasMetadataLink](ctx, hasMDCol, hk, c) + if err != nil { + return nil, err + } + out, err = c.addHMIfMatch(ctx, out, filter, link) + if err != nil { + return nil, gqlerror.Errorf("%v :: %v", funcName, err) + } } } } diff --git a/pkg/assembler/backends/keyvalue/hasSBOM.go b/pkg/assembler/backends/keyvalue/hasSBOM.go index 2456f73fbf..66e4fbbbfe 100644 --- a/pkg/assembler/backends/keyvalue/hasSBOM.go +++ b/pkg/assembler/backends/keyvalue/hasSBOM.go @@ -356,18 +356,24 @@ func (c *demoClient) HasSBOM(ctx context.Context, filter *model.HasSBOMSpec) ([] } } } else { - hsks, err := c.kv.Keys(ctx, hasSBOMCol) - if err != nil { - return nil, err - } - for _, hsk := range hsks { - link, err := byKeykv[*hasSBOMStruct](ctx, hasSBOMCol, hsk, c) + var done bool + scn := c.kv.Keys(hasSBOMCol) + for !done { + var hsks []string + var err error + hsks, done, err = scn.Scan(ctx) if err != nil { return nil, err } - out, err = c.addHasSBOMIfMatch(ctx, out, filter, link) - if err != nil { - return nil, gqlerror.Errorf("%v :: %v", funcName, err) + for _, hsk := range hsks { + link, err := byKeykv[*hasSBOMStruct](ctx, hasSBOMCol, hsk, c) + if err != nil { + return nil, err + } + out, err = c.addHasSBOMIfMatch(ctx, out, filter, link) + if err != nil { + return nil, gqlerror.Errorf("%v :: %v", funcName, err) + } } } } diff --git a/pkg/assembler/backends/keyvalue/hasSLSA.go b/pkg/assembler/backends/keyvalue/hasSLSA.go index cdb11a008b..cd7b2235eb 100644 --- a/pkg/assembler/backends/keyvalue/hasSLSA.go +++ b/pkg/assembler/backends/keyvalue/hasSLSA.go @@ -151,18 +151,24 @@ func (c *demoClient) HasSlsa(ctx context.Context, filter *model.HasSLSASpec) ([] } } } else { - slsaKeys, err := c.kv.Keys(ctx, slsaCol) - if err != nil { - return nil, err - } - for _, slsak := range slsaKeys { - link, err := byKeykv[*hasSLSAStruct](ctx, slsaCol, slsak, c) + var done bool + scn := c.kv.Keys(slsaCol) + for !done { + var slsaKeys []string + var err error + slsaKeys, done, err = scn.Scan(ctx) if err != nil { return nil, err } - out, err = c.addSLSAIfMatch(ctx, out, filter, link) - if err != nil { - return nil, gqlerror.Errorf("%v :: %v", funcName, err) + for _, slsak := range slsaKeys { + link, err := byKeykv[*hasSLSAStruct](ctx, slsaCol, slsak, c) + if err != nil { + return nil, err + } + out, err = c.addSLSAIfMatch(ctx, out, filter, link) + if err != nil { + return nil, gqlerror.Errorf("%v :: %v", funcName, err) + } } } } diff --git a/pkg/assembler/backends/keyvalue/hasSourceAt.go b/pkg/assembler/backends/keyvalue/hasSourceAt.go index 60b53d96c7..0d1a21a924 100644 --- a/pkg/assembler/backends/keyvalue/hasSourceAt.go +++ b/pkg/assembler/backends/keyvalue/hasSourceAt.go @@ -189,18 +189,24 @@ func (c *demoClient) HasSourceAt(ctx context.Context, filter *model.HasSourceAtS } } } else { - hsaKeys, err := c.kv.Keys(ctx, hsaCol) - if err != nil { - return nil, err - } - for _, hsak := range hsaKeys { - link, err := byKeykv[*srcMapLink](ctx, hsaCol, hsak, c) + var done bool + scn := c.kv.Keys(hsaCol) + for !done { + var hsaKeys []string + var err error + hsaKeys, done, err = scn.Scan(ctx) if err != nil { return nil, err } - out, err = c.addSrcIfMatch(ctx, out, filter, link) - if err != nil { - return nil, gqlerror.Errorf("%v :: %v", funcName, err) + for _, hsak := range hsaKeys { + link, err := byKeykv[*srcMapLink](ctx, hsaCol, hsak, c) + if err != nil { + return nil, err + } + out, err = c.addSrcIfMatch(ctx, out, filter, link) + if err != nil { + return nil, gqlerror.Errorf("%v :: %v", funcName, err) + } } } } diff --git a/pkg/assembler/backends/keyvalue/hashEqual.go b/pkg/assembler/backends/keyvalue/hashEqual.go index ca84cf1bbd..1a359e63ce 100644 --- a/pkg/assembler/backends/keyvalue/hashEqual.go +++ b/pkg/assembler/backends/keyvalue/hashEqual.go @@ -227,18 +227,24 @@ func (c *demoClient) HashEqual(ctx context.Context, filter *model.HashEqualSpec) } } } else { - heKeys, err := c.kv.Keys(ctx, hashEqCol) - if err != nil { - return nil, err - } - for _, hek := range heKeys { - link, err := byKeykv[*hashEqualStruct](ctx, hashEqCol, hek, c) + var done bool + scn := c.kv.Keys(hashEqCol) + for !done { + var heKeys []string + var err error + heKeys, done, err = scn.Scan(ctx) if err != nil { return nil, err } - out, err = c.addHEIfMatch(ctx, out, filter, link) - if err != nil { - return nil, gqlerror.Errorf("%v :: %v", funcName, err) + for _, hek := range heKeys { + link, err := byKeykv[*hashEqualStruct](ctx, hashEqCol, hek, c) + if err != nil { + return nil, err + } + out, err = c.addHEIfMatch(ctx, out, filter, link) + if err != nil { + return nil, gqlerror.Errorf("%v :: %v", funcName, err) + } } } } diff --git a/pkg/assembler/backends/keyvalue/isDependency.go b/pkg/assembler/backends/keyvalue/isDependency.go index 818cd9f1b2..ac4995a5fb 100644 --- a/pkg/assembler/backends/keyvalue/isDependency.go +++ b/pkg/assembler/backends/keyvalue/isDependency.go @@ -211,18 +211,24 @@ func (c *demoClient) IsDependency(ctx context.Context, filter *model.IsDependenc } } } else { - depKeys, err := c.kv.Keys(ctx, isDepCol) - if err != nil { - return nil, err - } - for _, depKey := range depKeys { - link, err := byKeykv[*isDependencyLink](ctx, isDepCol, depKey, c) + var done bool + scn := c.kv.Keys(isDepCol) + for !done { + var depKeys []string + var err error + depKeys, done, err = scn.Scan(ctx) if err != nil { return nil, err } - out, err = c.addDepIfMatch(ctx, out, filter, link) - if err != nil { - return nil, gqlerror.Errorf("%v :: %v", funcName, err) + for _, depKey := range depKeys { + link, err := byKeykv[*isDependencyLink](ctx, isDepCol, depKey, c) + if err != nil { + return nil, err + } + out, err = c.addDepIfMatch(ctx, out, filter, link) + if err != nil { + return nil, gqlerror.Errorf("%v :: %v", funcName, err) + } } } } diff --git a/pkg/assembler/backends/keyvalue/isOccurrence.go b/pkg/assembler/backends/keyvalue/isOccurrence.go index f1f7998d7f..6bc9e6de63 100644 --- a/pkg/assembler/backends/keyvalue/isOccurrence.go +++ b/pkg/assembler/backends/keyvalue/isOccurrence.go @@ -293,18 +293,24 @@ func (c *demoClient) IsOccurrence(ctx context.Context, filter *model.IsOccurrenc } } } else { - occKeys, err := c.kv.Keys(ctx, occCol) - if err != nil { - return nil, err - } - for _, ok := range occKeys { - link, err := byKeykv[*isOccurrenceStruct](ctx, occCol, ok, c) + var done bool + scn := c.kv.Keys(occCol) + for !done { + var occKeys []string + var err error + occKeys, done, err = scn.Scan(ctx) if err != nil { return nil, err } - out, err = c.addOccIfMatch(ctx, out, filter, link) - if err != nil { - return nil, gqlerror.Errorf("%v :: %v", funcName, err) + for _, ok := range occKeys { + link, err := byKeykv[*isOccurrenceStruct](ctx, occCol, ok, c) + if err != nil { + return nil, err + } + out, err = c.addOccIfMatch(ctx, out, filter, link) + if err != nil { + return nil, gqlerror.Errorf("%v :: %v", funcName, err) + } } } } diff --git a/pkg/assembler/backends/keyvalue/license.go b/pkg/assembler/backends/keyvalue/license.go index c0dc3f5ff2..d64d170ba3 100644 --- a/pkg/assembler/backends/keyvalue/license.go +++ b/pkg/assembler/backends/keyvalue/license.go @@ -169,21 +169,26 @@ func (c *demoClient) Licenses(ctx context.Context, licenseSpec *model.LicenseSpe } var rv []*model.License - lKeys, err := c.kv.Keys(ctx, licenseCol) - if err != nil { - return nil, err - } - for _, lk := range lKeys { - l, err := byKeykv[*licStruct](ctx, licenseCol, lk, c) + var done bool + scn := c.kv.Keys(licenseCol) + for !done { + var lKeys []string + lKeys, done, err = scn.Scan(ctx) if err != nil { return nil, err } - if noMatch(licenseSpec.Name, l.Name) || - noMatch(licenseSpec.ListVersion, l.ListVersion) || - noMatch(licenseSpec.Inline, l.Inline) { - continue + for _, lk := range lKeys { + l, err := byKeykv[*licStruct](ctx, licenseCol, lk, c) + if err != nil { + return nil, err + } + if noMatch(licenseSpec.Name, l.Name) || + noMatch(licenseSpec.ListVersion, l.ListVersion) || + noMatch(licenseSpec.Inline, l.Inline) { + continue + } + rv = append(rv, c.convLicense(l)) } - rv = append(rv, c.convLicense(l)) } return rv, nil } diff --git a/pkg/assembler/backends/keyvalue/pkg.go b/pkg/assembler/backends/keyvalue/pkg.go index 4840e725fa..a52b6be42d 100644 --- a/pkg/assembler/backends/keyvalue/pkg.go +++ b/pkg/assembler/backends/keyvalue/pkg.go @@ -528,22 +528,28 @@ func (c *demoClient) Packages(ctx context.Context, filter *model.PkgSpec) ([]*mo } } } else { - typeKeys, err := c.kv.Keys(ctx, pkgTypeCol) - if err != nil { - return nil, err - } - for _, tk := range typeKeys { - pkgTypeNode, err := byKeykv[*pkgType](ctx, pkgTypeCol, tk, c) + var done bool + scn := c.kv.Keys(pkgTypeCol) + for !done { + var typeKeys []string + var err error + typeKeys, done, err = scn.Scan(ctx) if err != nil { return nil, err } - pNamespaces := c.buildPkgNamespace(ctx, pkgTypeNode, filter) - if len(pNamespaces) > 0 { - out = append(out, &model.Package{ - ID: pkgTypeNode.ThisID, - Type: pkgTypeNode.Type, - Namespaces: pNamespaces, - }) + for _, tk := range typeKeys { + pkgTypeNode, err := byKeykv[*pkgType](ctx, pkgTypeCol, tk, c) + if err != nil { + return nil, err + } + pNamespaces := c.buildPkgNamespace(ctx, pkgTypeNode, filter) + if len(pNamespaces) > 0 { + out = append(out, &model.Package{ + ID: pkgTypeNode.ThisID, + Type: pkgTypeNode.Type, + Namespaces: pNamespaces, + }) + } } } } diff --git a/pkg/assembler/backends/keyvalue/pkgEqual.go b/pkg/assembler/backends/keyvalue/pkgEqual.go index a428f43214..2ad3409740 100644 --- a/pkg/assembler/backends/keyvalue/pkgEqual.go +++ b/pkg/assembler/backends/keyvalue/pkgEqual.go @@ -191,18 +191,24 @@ func (c *demoClient) PkgEqual(ctx context.Context, filter *model.PkgEqualSpec) ( } } } else { - peKeys, err := c.kv.Keys(ctx, pkgEqCol) - if err != nil { - return nil, err - } - for _, pek := range peKeys { - link, err := byKeykv[*pkgEqualStruct](ctx, pkgEqCol, pek, c) + var done bool + scn := c.kv.Keys(pkgEqCol) + for !done { + var peKeys []string + var err error + peKeys, done, err = scn.Scan(ctx) if err != nil { return nil, err } - out, err = c.addCPIfMatch(ctx, out, filter, link) - if err != nil { - return nil, gqlerror.Errorf("%v :: %v", funcName, err) + for _, pek := range peKeys { + link, err := byKeykv[*pkgEqualStruct](ctx, pkgEqCol, pek, c) + if err != nil { + return nil, err + } + out, err = c.addCPIfMatch(ctx, out, filter, link) + if err != nil { + return nil, gqlerror.Errorf("%v :: %v", funcName, err) + } } } } diff --git a/pkg/assembler/backends/keyvalue/pointOfContact.go b/pkg/assembler/backends/keyvalue/pointOfContact.go index b5431200ef..0f65da5eed 100644 --- a/pkg/assembler/backends/keyvalue/pointOfContact.go +++ b/pkg/assembler/backends/keyvalue/pointOfContact.go @@ -251,18 +251,24 @@ func (c *demoClient) PointOfContact(ctx context.Context, filter *model.PointOfCo } } } else { - pocKeys, err := c.kv.Keys(ctx, pocCol) - if err != nil { - return nil, err - } - for _, pk := range pocKeys { - link, err := byKeykv[*pointOfContactLink](ctx, pocCol, pk, c) + var done bool + scn := c.kv.Keys(pocCol) + for !done { + var pocKeys []string + var err error + pocKeys, done, err = scn.Scan(ctx) if err != nil { return nil, err } - out, err = c.addPOCIfMatch(ctx, out, filter, link) - if err != nil { - return nil, gqlerror.Errorf("%v :: %v", funcName, err) + for _, pk := range pocKeys { + link, err := byKeykv[*pointOfContactLink](ctx, pocCol, pk, c) + if err != nil { + return nil, err + } + out, err = c.addPOCIfMatch(ctx, out, filter, link) + if err != nil { + return nil, gqlerror.Errorf("%v :: %v", funcName, err) + } } } } diff --git a/pkg/assembler/backends/keyvalue/src.go b/pkg/assembler/backends/keyvalue/src.go index 64f68ba606..96d8ce9a84 100644 --- a/pkg/assembler/backends/keyvalue/src.go +++ b/pkg/assembler/backends/keyvalue/src.go @@ -332,22 +332,28 @@ func (c *demoClient) Sources(ctx context.Context, filter *model.SourceSpec) ([]* } } } else { - typeKeys, err := c.kv.Keys(ctx, srcTypeCol) - if err != nil { - return nil, err - } - for _, tk := range typeKeys { - srcTypeNode, err := byKeykv[*srcType](ctx, srcTypeCol, tk, c) + var done bool + scn := c.kv.Keys(srcTypeCol) + for !done { + var typeKeys []string + var err error + typeKeys, done, err = scn.Scan(ctx) if err != nil { return nil, err } - sNamespaces := c.buildSourceNamespace(ctx, srcTypeNode, filter) - if len(sNamespaces) > 0 { - out = append(out, &model.Source{ - ID: srcTypeNode.ThisID, - Type: srcTypeNode.Type, - Namespaces: sNamespaces, - }) + for _, tk := range typeKeys { + srcTypeNode, err := byKeykv[*srcType](ctx, srcTypeCol, tk, c) + if err != nil { + return nil, err + } + sNamespaces := c.buildSourceNamespace(ctx, srcTypeNode, filter) + if len(sNamespaces) > 0 { + out = append(out, &model.Source{ + ID: srcTypeNode.ThisID, + Type: srcTypeNode.Type, + Namespaces: sNamespaces, + }) + } } } } diff --git a/pkg/assembler/backends/keyvalue/vulnEqual.go b/pkg/assembler/backends/keyvalue/vulnEqual.go index e70054f219..c1497cc8a8 100644 --- a/pkg/assembler/backends/keyvalue/vulnEqual.go +++ b/pkg/assembler/backends/keyvalue/vulnEqual.go @@ -196,18 +196,24 @@ func (c *demoClient) VulnEqual(ctx context.Context, filter *model.VulnEqualSpec) } } } else { - veKeys, err := c.kv.Keys(ctx, vulnEqCol) - if err != nil { - return nil, err - } - for _, vek := range veKeys { - link, err := byKeykv[*vulnerabilityEqualLink](ctx, vulnEqCol, vek, c) + var done bool + scn := c.kv.Keys(vulnEqCol) + for !done { + var veKeys []string + var err error + veKeys, done, err = scn.Scan(ctx) if err != nil { return nil, err } - out, err = c.addVulnIfMatch(ctx, out, filter, link) - if err != nil { - return nil, gqlerror.Errorf("%v :: %v", funcName, err) + for _, vek := range veKeys { + link, err := byKeykv[*vulnerabilityEqualLink](ctx, vulnEqCol, vek, c) + if err != nil { + return nil, err + } + out, err = c.addVulnIfMatch(ctx, out, filter, link) + if err != nil { + return nil, gqlerror.Errorf("%v :: %v", funcName, err) + } } } } diff --git a/pkg/assembler/backends/keyvalue/vulnMetadata.go b/pkg/assembler/backends/keyvalue/vulnMetadata.go index a4d5ff633c..abecf5417b 100644 --- a/pkg/assembler/backends/keyvalue/vulnMetadata.go +++ b/pkg/assembler/backends/keyvalue/vulnMetadata.go @@ -174,18 +174,24 @@ func (c *demoClient) VulnerabilityMetadata(ctx context.Context, filter *model.Vu } } } else { - vmdKeys, err := c.kv.Keys(ctx, vulnMDCol) - if err != nil { - return nil, err - } - for _, vmdk := range vmdKeys { - link, err := byKeykv[*vulnerabilityMetadataLink](ctx, vulnMDCol, vmdk, c) + var done bool + scn := c.kv.Keys(vulnMDCol) + for !done { + var vmdKeys []string + var err error + vmdKeys, done, err = scn.Scan(ctx) if err != nil { return nil, err } - out, err = c.addVulnMetadataMatch(ctx, out, filter, link) - if err != nil { - return nil, gqlerror.Errorf("%v :: %v", funcName, err) + for _, vmdk := range vmdKeys { + link, err := byKeykv[*vulnerabilityMetadataLink](ctx, vulnMDCol, vmdk, c) + if err != nil { + return nil, err + } + out, err = c.addVulnMetadataMatch(ctx, out, filter, link) + if err != nil { + return nil, gqlerror.Errorf("%v :: %v", funcName, err) + } } } } diff --git a/pkg/assembler/backends/keyvalue/vulnerability.go b/pkg/assembler/backends/keyvalue/vulnerability.go index cd99da5370..d30f7b0542 100644 --- a/pkg/assembler/backends/keyvalue/vulnerability.go +++ b/pkg/assembler/backends/keyvalue/vulnerability.go @@ -250,22 +250,28 @@ func (c *demoClient) Vulnerabilities(ctx context.Context, filter *model.Vulnerab } } } else { - typeKeys, err := c.kv.Keys(ctx, vulnTypeCol) - if err != nil { - return nil, err - } - for _, tk := range typeKeys { - typeStruct, err := byKeykv[*vulnTypeStruct](ctx, vulnTypeCol, tk, c) + var done bool + scn := c.kv.Keys(vulnTypeCol) + for !done { + var typeKeys []string + var err error + typeKeys, done, err = scn.Scan(ctx) if err != nil { return nil, err } - vulnIDs := c.buildVulnID(ctx, typeStruct, filter) - if len(vulnIDs) > 0 { - out = append(out, &model.Vulnerability{ - ID: typeStruct.ThisID, - Type: typeStruct.Type, - VulnerabilityIDs: vulnIDs, - }) + for _, tk := range typeKeys { + typeStruct, err := byKeykv[*vulnTypeStruct](ctx, vulnTypeCol, tk, c) + if err != nil { + return nil, err + } + vulnIDs := c.buildVulnID(ctx, typeStruct, filter) + if len(vulnIDs) > 0 { + out = append(out, &model.Vulnerability{ + ID: typeStruct.ThisID, + Type: typeStruct.Type, + VulnerabilityIDs: vulnIDs, + }) + } } } } diff --git a/pkg/assembler/kv/kv.go b/pkg/assembler/kv/kv.go index 1a7dc2dd1c..95391e589f 100644 --- a/pkg/assembler/kv/kv.go +++ b/pkg/assembler/kv/kv.go @@ -31,10 +31,8 @@ type Store interface { // Sets a value, creates collection if necessary Set(ctx context.Context, collection, key string, value any) error - // Returns a slice of all keys for a collection. If collection does not - // exist, return a nil slice. - // TODO(jeffmendoza) implement scanning in kv interface - Keys(ctx context.Context, collection string) ([]string, error) + // Create a scanner that will be used to get all the keys in a collection. + Keys(collection string) Scanner } // Error to return (wrap) on Get if value not found @@ -43,3 +41,14 @@ var NotFoundError = errors.New("Not found") // Error to return (wrap) on Get if Ptr is not a pointer, or not the right // type. var BadPtrError = errors.New("Bad pointer") + +// Scanner is used to get all the keys for a collection. The concrete +// implementation will store any intermediate cursors or last key data so that +// the next call to Scan will pick up where the last one left off. Each +// instance will only be used once. +type Scanner interface { + + // Scan returns some number of keys. If the collection does not exist, return + // a nil slice. If there are no more keys, return true as end signal. + Scan(ctx context.Context) ([]string, bool, error) +} diff --git a/pkg/assembler/kv/memmap/memmap.go b/pkg/assembler/kv/memmap/memmap.go index 9a1677be5f..ff49d6338a 100644 --- a/pkg/assembler/kv/memmap/memmap.go +++ b/pkg/assembler/kv/memmap/memmap.go @@ -24,17 +24,18 @@ import ( "golang.org/x/exp/maps" ) -type Store struct { +type store struct { m map[string]map[string]any } +// GetStore returns a kv.Store that stores all data in an internal go map. func GetStore() kv.Store { - return &Store{ + return &store{ m: make(map[string]map[string]any), } } -func (s *Store) Get(_ context.Context, c, k string, v any) error { +func (s *store) Get(_ context.Context, c, k string, v any) error { col, ok := s.m[c] if !ok { return fmt.Errorf("%w : Collection %q", kv.NotFoundError, c) @@ -47,7 +48,7 @@ func (s *Store) Get(_ context.Context, c, k string, v any) error { return copyAny(val, v) } -func (s *Store) Set(_ context.Context, c, k string, v any) error { +func (s *store) Set(_ context.Context, c, k string, v any) error { if s.m[c] == nil { s.m[c] = make(map[string]any) } @@ -55,11 +56,12 @@ func (s *Store) Set(_ context.Context, c, k string, v any) error { return nil } -func (s *Store) Keys(_ context.Context, c string) ([]string, error) { - if s.m[c] == nil { - return nil, nil +func (s *store) Keys(c string) kv.Scanner { + return &scanner{ + collection: c, + done: false, + store: s, } - return maps.Keys(s.m[c]), nil } func copyAny(src any, dst any) error { @@ -80,3 +82,20 @@ func copyAny(src any, dst any) error { d.Set(s) return nil } + +type scanner struct { + collection string + done bool + store *store +} + +func (s *scanner) Scan(_ context.Context) ([]string, bool, error) { + if s.done { + return nil, true, nil + } + s.done = true + if s.store.m[s.collection] == nil { + return nil, true, nil + } + return maps.Keys(s.store.m[s.collection]), true, nil +} diff --git a/pkg/assembler/kv/redis/redis.go b/pkg/assembler/kv/redis/redis.go index b995f08206..27a5c11e91 100644 --- a/pkg/assembler/kv/redis/redis.go +++ b/pkg/assembler/kv/redis/redis.go @@ -26,7 +26,9 @@ import ( var json = jsoniter.ConfigFastest -type Store struct { +const count = 1000 + +type store struct { c *redis.Client } @@ -39,12 +41,12 @@ func GetStore(s string) (kv.Store, error) { return nil, err } - return &Store{ + return &store{ c: redis.NewClient(opt), }, nil } -func (s *Store) Get(ctx context.Context, c, k string, v any) error { +func (s *store) Get(ctx context.Context, c, k string, v any) error { j, err := s.c.HGet(ctx, c, k).Result() // TODO(jeffmendoza), should figure out error type and check it, instead just see if // string is empty for now. @@ -57,7 +59,7 @@ func (s *Store) Get(ctx context.Context, c, k string, v any) error { return json.Unmarshal([]byte(j), v) } -func (s *Store) Set(ctx context.Context, c, k string, v any) error { +func (s *store) Set(ctx context.Context, c, k string, v any) error { b, err := json.Marshal(v) if err != nil { return err @@ -65,7 +67,33 @@ func (s *Store) Set(ctx context.Context, c, k string, v any) error { return s.c.HSet(ctx, c, k, string(b)).Err() } -func (s *Store) Keys(ctx context.Context, c string) ([]string, error) { - // TODO(jeffmendoza) implement scanning in kv interface, use "Keys" for now. - return s.c.HKeys(ctx, c).Result() +func (s *store) Keys(c string) kv.Scanner { + return &scanner{ + collection: c, + done: false, + cursor: 0, + c: s.c, + } +} + +type scanner struct { + collection string + done bool + cursor uint64 + c *redis.Client +} + +func (s *scanner) Scan(ctx context.Context) ([]string, bool, error) { + if s.done { + return nil, true, nil + } + rv, newCur, err := s.c.HScan(ctx, s.collection, s.cursor, "", count).Result() + if err != nil { + return nil, false, err + } + s.cursor = newCur + if newCur == 0 { + s.done = true + } + return rv, s.done, nil } diff --git a/pkg/assembler/kv/tikv/tikv.go b/pkg/assembler/kv/tikv/tikv.go index 34e0ff1649..319d1ee673 100644 --- a/pkg/assembler/kv/tikv/tikv.go +++ b/pkg/assembler/kv/tikv/tikv.go @@ -16,6 +16,7 @@ package tikv import ( + "bytes" "context" "strings" @@ -28,7 +29,9 @@ import ( var json = jsoniter.ConfigFastest -type Store struct { +const count = 1000 + +type store struct { c *rawkv.Client } @@ -38,12 +41,12 @@ func GetStore(ctx context.Context, s string) (kv.Store, error) { if err != nil { return nil, err } - return &Store{ + return &store{ c: c, }, nil } -func (s *Store) Get(ctx context.Context, c, k string, v any) error { +func (s *store) Get(ctx context.Context, c, k string, v any) error { ck := strings.Join([]string{c, k}, ":") bts, err := s.c.Get(ctx, []byte(ck)) // TODO(jeffmendoza), should figure out error type and check it, instead just see if @@ -57,7 +60,7 @@ func (s *Store) Get(ctx context.Context, c, k string, v any) error { return json.Unmarshal(bts, v) } -func (s *Store) Set(ctx context.Context, c, k string, v any) error { +func (s *store) Set(ctx context.Context, c, k string, v any) error { ck := strings.Join([]string{c, k}, ":") bts, err := json.Marshal(v) if err != nil { @@ -66,15 +69,44 @@ func (s *Store) Set(ctx context.Context, c, k string, v any) error { return s.c.Put(ctx, []byte(ck), bts) } -func (s *Store) Keys(ctx context.Context, c string) ([]string, error) { - // TODO(jeffmendoza) implement scanning in kv interface, use 1000 for now. - ks, _, err := s.c.Scan(ctx, []byte(c), kvti.PrefixNextKey([]byte(c)), 10000, rawkv.ScanKeyOnly()) +func (s *store) Keys(c string) kv.Scanner { + return &scanner{ + c: s.c, + done: false, + curKey: []byte(c), + endKey: kvti.PrefixNextKey([]byte(c)), + } +} + +type scanner struct { + c *rawkv.Client + done bool + curKey []byte + endKey []byte +} + +func (s *scanner) Scan(ctx context.Context) ([]string, bool, error) { + if s.done { + return nil, true, nil + } + ks, _, err := s.c.Scan(ctx, s.curKey, s.endKey, count, rawkv.ScanKeyOnly()) if err != nil { - return nil, err + return nil, false, err + } + if len(ks) < count { + s.done = true + } + if len(ks) == 0 { + return nil, true, nil } rv := make([]string, len(ks)) + var largest []byte for i, k := range ks { + if bytes.Compare(k, largest) > 0 { + largest = k + } rv[i] = string(k) } - return rv, nil + s.curKey = kvti.NextKey(largest) + return rv, s.done, nil }