From 7062828412095ee56c27638c80972f5f9813c43a Mon Sep 17 00:00:00 2001 From: Mikhail Swift Date: Wed, 13 Jul 2022 17:29:33 -0400 Subject: [PATCH] feat: object streaming Stream attestations to and from the collection service. grpc has a default message size of 4MB and (from my limited research) isn't suited well for large single messages. Instead opting to stream many messages is preferred. Signed-off-by: Mikhail Swift --- cmd/archivist/main.go | 10 +- cmd/archivistctl/cmd/retrieve.go | 19 ++- cmd/archivistctl/cmd/store.go | 58 +++++++-- go.mod | 4 +- go.sum | 4 +- internal/metadatastorage/mysqlstore/mysql.go | 83 +++++-------- internal/objectstorage/blobstore/minio.go | 95 ++------------- internal/objectstorage/filestore/file.go | 45 ++----- internal/server/server.go | 119 ++++++++++++++++--- 9 files changed, 220 insertions(+), 217 deletions(-) diff --git a/cmd/archivist/main.go b/cmd/archivist/main.go index 2adc50ba..45a444f3 100644 --- a/cmd/archivist/main.go +++ b/cmd/archivist/main.go @@ -31,7 +31,7 @@ import ( "github.com/testifysec/archivist-api/pkg/api/archivist" "github.com/testifysec/archivist/internal/config" "github.com/testifysec/archivist/internal/metadatastorage/mysqlstore" - "github.com/testifysec/archivist/internal/objectstorage/blobstore" + blob "github.com/testifysec/archivist/internal/objectstorage/blobstore" "github.com/testifysec/archivist/internal/objectstorage/filestore" "github.com/testifysec/archivist/internal/server" @@ -105,7 +105,7 @@ func main() { logrus.Fatalf("error initializing storage clients: %+v", err) } - mysqlStore, mysqlStoreCh, err := mysqlstore.NewServer(ctx, cfg.SQLStoreConnectionString) + mysqlStore, mysqlStoreCh, err := mysqlstore.New(ctx, cfg.SQLStoreConnectionString) log.FromContext(ctx).WithField("duration", time.Since(now)).Infof("completed phase 3: initializing storage clients") // ******************************************************************************** @@ -166,13 +166,13 @@ func initSpiffeConnection(ctx context.Context, cfg *config.Config) []grpc.Server return opts } -func initObjectStore(ctx context.Context, cfg *config.Config) (archivist.CollectorServer, <-chan error, error) { +func initObjectStore(ctx context.Context, cfg *config.Config) (server.ObjectStorer, <-chan error, error) { switch strings.ToUpper(cfg.StorageBackend) { case "FILE": - return filestore.NewServer(ctx, cfg.FileDir, cfg.FileServeOn) + return filestore.New(ctx, cfg.FileDir, cfg.FileServeOn) case "BLOB": - return blob.NewMinioClient( + return blob.New( ctx, cfg.BlobStoreEndpoint, cfg.BlobStoreAccessKeyId, diff --git a/cmd/archivistctl/cmd/retrieve.go b/cmd/archivistctl/cmd/retrieve.go index 7835832f..ef348ed0 100644 --- a/cmd/archivistctl/cmd/retrieve.go +++ b/cmd/archivistctl/cmd/retrieve.go @@ -1,10 +1,10 @@ package cmd import ( + "bytes" "context" "io" "os" - "strings" "github.com/spf13/cobra" "github.com/testifysec/archivist-api/pkg/api/archivist" @@ -46,13 +46,24 @@ func init() { } func retrieveEnvelope(ctx context.Context, client archivist.CollectorClient, gitoid string, out io.Writer) error { - resp, err := client.Get(ctx, &archivist.GetRequest{Gitoid: gitoid}) + stream, err := client.Get(ctx, &archivist.GetRequest{Gitoid: gitoid}) if err != nil { return err } - if _, err := io.Copy(out, strings.NewReader(resp.Object)); err != nil { - return err + for { + chunk, err := stream.Recv() + if err == io.EOF { + break + } + + if err != nil { + return err + } + + if _, err := io.Copy(out, bytes.NewReader(chunk.GetChunk())); err != nil { + return err + } } return nil diff --git a/cmd/archivistctl/cmd/store.go b/cmd/archivistctl/cmd/store.go index 713e1557..177c9773 100644 --- a/cmd/archivistctl/cmd/store.go +++ b/cmd/archivistctl/cmd/store.go @@ -9,6 +9,7 @@ import ( "github.com/spf13/cobra" "github.com/testifysec/archivist-api/pkg/api/archivist" + "github.com/testifysec/archivist/internal/server" "github.com/testifysec/go-witness/dsse" ) @@ -17,7 +18,7 @@ var ( Use: "store", Short: "stores an attestation on the archivist server", SilenceUsage: true, - Args: cobra.ExactArgs(1), + Args: cobra.MinimumNArgs(1), RunE: func(cmd *cobra.Command, args []string) error { conn, err := newConn(archivistUrl) defer conn.Close() @@ -25,13 +26,15 @@ var ( return err } - file, err := os.Open(args[0]) - defer file.Close() - if err != nil { - return err + for _, filePath := range args { + if gitoid, err := storeAttestationByPath(cmd.Context(), archivist.NewCollectorClient(conn), filePath); err != nil { + return fmt.Errorf("failed to store %s: %w", filePath, err) + } else { + fmt.Printf("%s stored with gitoid %s\n", filePath, gitoid) + } } - return storeAttestation(cmd.Context(), archivist.NewCollectorClient(conn), file) + return nil }, } ) @@ -40,24 +43,53 @@ func init() { rootCmd.AddCommand(storeCmd) } -func storeAttestation(ctx context.Context, client archivist.CollectorClient, envelope io.Reader) error { +func storeAttestationByPath(ctx context.Context, client archivist.CollectorClient, path string) (string, error) { + file, err := os.Open(path) + defer file.Close() + if err != nil { + return "", err + } + + return storeAttestation(ctx, client, file) +} + +func storeAttestation(ctx context.Context, client archivist.CollectorClient, envelope io.Reader) (string, error) { objBytes, err := io.ReadAll(envelope) if err != nil { - return err + return "", err } obj := &dsse.Envelope{} if err := json.Unmarshal(objBytes, &obj); err != nil { - return err + return "", err } if len(obj.Payload) == 0 || obj.PayloadType == "" || len(obj.Signatures) == 0 { - return fmt.Errorf("obj is not DSSE %d %d %d", len(obj.Payload), len(obj.PayloadType), len(obj.Signatures)) + return "", fmt.Errorf("obj is not DSSE %d %d %d", len(obj.Payload), len(obj.PayloadType), len(obj.Signatures)) } - if _, err := client.Store(ctx, &archivist.StoreRequest{Object: string(objBytes)}); err != nil { - return err + stream, err := client.Store(ctx) + if err != nil { + return "", err + } + + chunk := &archivist.Chunk{} + for curr := 0; curr < len(objBytes); curr += server.ChunkSize { + if curr+server.ChunkSize > len(objBytes) { + chunk.Chunk = objBytes[curr:] + } else { + chunk.Chunk = objBytes[curr : curr+server.ChunkSize] + } + + if err := stream.Send(chunk); err != nil { + return "", err + } + } + + resp, err := stream.CloseAndRecv() + if err != nil { + return "", err } - return nil + return resp.GetGitoid(), nil } diff --git a/go.mod b/go.mod index 4a40c7c8..40d495fe 100644 --- a/go.mod +++ b/go.mod @@ -15,10 +15,9 @@ require ( github.com/sirupsen/logrus v1.8.1 github.com/spf13/cobra v1.4.0 github.com/spiffe/go-spiffe/v2 v2.0.0 - github.com/testifysec/archivist-api v0.0.0-20220707182002-b803369e93a4 + github.com/testifysec/archivist-api v0.0.0-20220719182705-980989009502 github.com/testifysec/go-witness v0.1.11 google.golang.org/grpc v1.46.0 - google.golang.org/protobuf v1.28.0 ) require ( @@ -65,5 +64,6 @@ require ( golang.org/x/sys v0.0.0-20220412211240-33da011f77ad // indirect golang.org/x/text v0.3.7 // indirect google.golang.org/genproto v0.0.0-20220222213610-43724f9ea8cf // indirect + google.golang.org/protobuf v1.28.0 // indirect gopkg.in/square/go-jose.v2 v2.6.0 // indirect ) diff --git a/go.sum b/go.sum index fd7a2947..ca3b76a7 100644 --- a/go.sum +++ b/go.sum @@ -272,8 +272,8 @@ github.com/stretchr/testify v1.5.1/go.mod h1:5W2xD1RspED5o8YsWQXVCued0rvSQ+mT+I5 github.com/stretchr/testify v1.6.1/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/h/Wwjteg= github.com/stretchr/testify v1.7.0/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/h/Wwjteg= github.com/stretchr/testify v1.7.1 h1:5TQK59W5E3v0r2duFAb7P95B6hEeOyEnHRa8MjYSMTY= -github.com/testifysec/archivist-api v0.0.0-20220707182002-b803369e93a4 h1:osCfJqj8ohaHzxY7Ssn1XMUvUiLAUUSY8F0jYqgvUTA= -github.com/testifysec/archivist-api v0.0.0-20220707182002-b803369e93a4/go.mod h1:HWpNFd8qFXCoU8gEF/xiuG10ni9EBFhPcpAFTcWDAmc= +github.com/testifysec/archivist-api v0.0.0-20220719182705-980989009502 h1:9y2qBW9yrxDa5R+mHr4uHmPIOIkyjhCD5XzNtynHVMU= +github.com/testifysec/archivist-api v0.0.0-20220719182705-980989009502/go.mod h1:HWpNFd8qFXCoU8gEF/xiuG10ni9EBFhPcpAFTcWDAmc= github.com/testifysec/go-witness v0.1.11 h1:CK5I7g7yu+ObXraYN96KHZu9VmLLs4vKEvfcEi4E35E= github.com/testifysec/go-witness v0.1.11/go.mod h1:EGTMK84vV6/7kiCbJYonESTvaeOW2eMJVHh3mW/EWYU= github.com/tv42/httpunix v0.0.0-20150427012821-b75d8614f926/go.mod h1:9ESjWnEqriFuLhtthL60Sar/7RFoluCcXsuvEwTV5KM= diff --git a/internal/metadatastorage/mysqlstore/mysql.go b/internal/metadatastorage/mysqlstore/mysql.go index f8436a37..2d057871 100644 --- a/internal/metadatastorage/mysqlstore/mysql.go +++ b/internal/metadatastorage/mysqlstore/mysql.go @@ -24,7 +24,6 @@ import ( "ariga.io/sqlcomment" "entgo.io/ent/dialect/sql" - "github.com/git-bom/gitbom-go" "github.com/sirupsen/logrus" "github.com/testifysec/archivist-api/pkg/api/archivist" "github.com/testifysec/archivist/ent" @@ -38,25 +37,15 @@ import ( "github.com/testifysec/go-witness/cryptoutil" "github.com/testifysec/go-witness/dsse" "github.com/testifysec/go-witness/intoto" - "google.golang.org/protobuf/types/known/emptypb" _ "github.com/go-sql-driver/mysql" ) -type UnifiedStorage interface { - archivist.ArchivistServer - archivist.CollectorServer +type Store struct { + client *ent.Client } -type store struct { - archivist.UnimplementedArchivistServer - archivist.UnimplementedCollectorServer - - client *ent.Client - objectStorage archivist.CollectorServer -} - -func NewServer(ctx context.Context, connectionstring string) (UnifiedStorage, <-chan error, error) { +func New(ctx context.Context, connectionstring string) (*Store, <-chan error, error) { drv, err := sql.Open("mysql", connectionstring) if err != nil { return nil, nil, err @@ -92,13 +81,12 @@ func NewServer(ctx context.Context, connectionstring string) (UnifiedStorage, <- logrus.WithContext(ctx).Fatalf("failed creating schema resources: %v", err) } - return &store{ + return &Store{ client: client, }, errCh, nil } -func (s *store) GetBySubjectDigest(request *archivist.GetBySubjectDigestRequest, server archivist.Archivist_GetBySubjectDigestServer) error { - ctx := server.Context() +func (s *Store) GetBySubjectDigest(ctx context.Context, request *archivist.GetBySubjectDigestRequest) (<-chan *archivist.GetBySubjectDigestResponse, error) { statementPredicates := []predicate.Statement{statement.HasSubjectsWith( subject.HasSubjectDigestsWith( subjectdigest.And( @@ -122,26 +110,32 @@ func (s *store) GetBySubjectDigest(request *archivist.GetBySubjectDigestRequest, }).All(ctx) if err != nil { - return err + return nil, err } - for _, curDsse := range res { - response := &archivist.GetBySubjectDigestResponse{} - response.Gitoid = curDsse.GitbomSha256 - response.CollectionName = curDsse.Edges.Statement.Edges.AttestationCollections.Name - for _, curAttestation := range curDsse.Edges.Statement.Edges.AttestationCollections.Edges.Attestations { - response.Attestations = append(response.Attestations, curAttestation.Type) - } + out := make(chan *archivist.GetBySubjectDigestResponse, 1) + go func() { + defer close(out) + for _, curDsse := range res { + response := &archivist.GetBySubjectDigestResponse{} + response.Gitoid = curDsse.GitbomSha256 + response.CollectionName = curDsse.Edges.Statement.Edges.AttestationCollections.Name + for _, curAttestation := range curDsse.Edges.Statement.Edges.AttestationCollections.Edges.Attestations { + response.Attestations = append(response.Attestations, curAttestation.Type) + } - if err := server.Send(response); err != nil { - return err + select { + case <-ctx.Done(): + return + case out <- response: + } } - } + }() - return nil + return out, nil } -func (s *store) withTx(ctx context.Context, fn func(tx *ent.Tx) error) error { +func (s *Store) withTx(ctx context.Context, fn func(tx *ent.Tx) error) error { tx, err := s.client.Tx(ctx) if err != nil { return err @@ -173,41 +167,31 @@ type parsedCollection struct { } `json:"attestations"` } -func (s *store) Store(ctx context.Context, request *archivist.StoreRequest) (*emptypb.Empty, error) { - fmt.Println("STORING") - - obj := request.GetObject() +func (s *Store) Store(ctx context.Context, gitoid string, obj []byte) error { envelope := &dsse.Envelope{} - if err := json.Unmarshal([]byte(obj), envelope); err != nil { - return nil, err + if err := json.Unmarshal(obj, envelope); err != nil { + return err } payloadDigestSet, err := cryptoutil.CalculateDigestSetFromBytes(envelope.Payload, []crypto.Hash{crypto.SHA256}) if err != nil { - return nil, err + return err } payload := &intoto.Statement{} if err := json.Unmarshal(envelope.Payload, payload); err != nil { - return nil, err + return err } parsedCollection := &parsedCollection{} if err := json.Unmarshal(payload.Predicate, parsedCollection); err != nil { - return nil, err - } - - // generate gitbom - gb := gitbom.NewSha256GitBom() - if err := gb.AddReference([]byte(obj), nil); err != nil { - logrus.WithContext(ctx).Errorf("gitbom tag generation failed: %+v", err) - return nil, err + return err } err = s.withTx(ctx, func(tx *ent.Tx) error { dsse, err := tx.Dsse.Create(). SetPayloadType(envelope.PayloadType). - SetGitbomSha256(gb.Identity()). + SetGitbomSha256(gitoid). Save(ctx) if err != nil { return err @@ -288,9 +272,8 @@ func (s *store) Store(ctx context.Context, request *archivist.StoreRequest) (*em if err != nil { logrus.Errorf("unable to store metadata: %+v", err) - return nil, err + return err } - fmt.Println("metadata stored") - return &emptypb.Empty{}, nil + return nil } diff --git a/internal/objectstorage/blobstore/minio.go b/internal/objectstorage/blobstore/minio.go index 84d061f1..00770c37 100644 --- a/internal/objectstorage/blobstore/minio.go +++ b/internal/objectstorage/blobstore/minio.go @@ -20,77 +20,19 @@ import ( "fmt" "io" - "github.com/git-bom/gitbom-go" "github.com/minio/minio-go" "github.com/minio/minio-go/pkg/credentials" - "github.com/sirupsen/logrus" "github.com/testifysec/archivist-api/pkg/api/archivist" - "google.golang.org/protobuf/types/known/emptypb" ) -// Indexer calculates the index reference for an input blob, -// and gets/puts blobs at that index in and out of the backing -// blob storage. -type Indexer interface { - GetRef(obj []byte) (string, error) - GetBlob(idx string) ([]byte, error) - PutBlob(idx string, obj []byte) error -} - -type attestationBlobStore struct { - archivist.UnimplementedCollectorServer - +type Store struct { client *minio.Client bucket string location string } -// GetRef calculates the index reference for a given object -func (store *attestationBlobStore) GetRef(obj []byte) (string, error) { - gb := gitbom.NewSha256GitBom() - if err := gb.AddReference(obj, nil); err != nil { - return "", err - } - return gb.Identity(), nil -} - -// GetBlob retrieves an attesation from the backend store -func (store *attestationBlobStore) GetBlob(idx string) ([]byte, error) { - opt := minio.GetObjectOptions{} - chunkSize := 8 * 1024 - buf := make([]byte, 0, chunkSize) - outBuf := bytes.NewBuffer(buf) - - obj, err := store.client.GetObject(store.bucket, idx, opt) - if err != nil { - return buf, err - } - - var n int64 - for { - _ = opt.SetRange(n, n+int64(chunkSize)-1) - readBytes, err := outBuf.ReadFrom(obj) - if err == nil { - return outBuf.Bytes(), nil - } - if err != nil { - if err == io.EOF { - _, err = outBuf.ReadFrom(bytes.NewReader(buf)) - break - } - } - - n += readBytes - _, err = outBuf.ReadFrom(bytes.NewReader(buf)) - if err != nil { - return buf, fmt.Errorf("failed to chunk blob: %v", err) - } - } - return []byte{}, fmt.Errorf("failed to read out object: %v", err) -} - // PutBlob stores the attestation blob into the backend store -func (store *attestationBlobStore) PutBlob(idx string, obj []byte) error { +func (store *Store) PutBlob(idx string, obj []byte) error { opt := minio.PutObjectOptions{} size := int64(len(obj)) n, err := store.client.PutObject(store.bucket, idx, bytes.NewReader(obj), size, opt) @@ -102,8 +44,8 @@ func (store *attestationBlobStore) PutBlob(idx string, obj []byte) error { return nil } -// NewMinioClient returns a reader/writer for storing/retrieving attestations -func NewMinioClient(ctx context.Context, endpoint, accessKeyId, secretAccessKeyId, bucketName string, useSSL bool) (archivist.CollectorServer, <-chan error, error) { +// New returns a reader/writer for storing/retrieving attestations +func New(ctx context.Context, endpoint, accessKeyId, secretAccessKeyId, bucketName string, useSSL bool) (*Store, <-chan error, error) { errCh := make(chan error) go func() { <-ctx.Done() @@ -128,36 +70,17 @@ func NewMinioClient(ctx context.Context, endpoint, accessKeyId, secretAccessKeyI return nil, errCh, err } - return &attestationBlobStore{ + return &Store{ client: c, location: loc, bucket: bucketName, }, errCh, nil } -func (s *attestationBlobStore) Get(ctx context.Context, req *archivist.GetRequest) (*archivist.GetResponse, error) { - obj, err := s.GetBlob(req.GetGitoid()) - if err != nil { - logrus.WithContext(ctx).Errorf("failed to retrieve object: %+v", err) - return nil, err - } - - return &archivist.GetResponse{ - Gitoid: req.GetGitoid(), - Object: string(obj), - }, nil +func (s *Store) Get(ctx context.Context, req *archivist.GetRequest) (io.ReadCloser, error) { + return s.client.GetObjectWithContext(ctx, s.bucket, req.GetGitoid(), minio.GetObjectOptions{}) } -func (s *attestationBlobStore) Store(ctx context.Context, req *archivist.StoreRequest) (*emptypb.Empty, error) { - idx, err := s.GetRef([]byte(req.GetObject())) - if err != nil { - logrus.WithContext(ctx).Errorf("gitbom tag generation failed: %+v", err) - return nil, err - } - - if err := s.PutBlob(idx, []byte(req.GetObject())); err != nil { - logrus.WithContext(ctx).Errorf("failed to store object: %+v", err) - } - - return &emptypb.Empty{}, nil +func (s *Store) Store(ctx context.Context, gitoid string, payload []byte) error { + return s.PutBlob(gitoid, payload) } diff --git a/internal/objectstorage/filestore/file.go b/internal/objectstorage/filestore/file.go index 02f81709..13cadea9 100644 --- a/internal/objectstorage/filestore/file.go +++ b/internal/objectstorage/filestore/file.go @@ -2,65 +2,38 @@ package filestore import ( "context" - "fmt" - "io/ioutil" + "io" "log" "net/http" + "os" "path/filepath" - "github.com/git-bom/gitbom-go" "github.com/gorilla/handlers" - "github.com/sirupsen/logrus" "github.com/testifysec/archivist-api/pkg/api/archivist" - "google.golang.org/protobuf/types/known/emptypb" ) -type store struct { - archivist.UnimplementedCollectorServer - +type Store struct { prefix string } -func NewServer(ctx context.Context, directory string, address string) (archivist.CollectorServer, <-chan error, error) { +func New(ctx context.Context, directory string, address string) (*Store, <-chan error, error) { errCh := make(chan error) go func() { - server := handlers.CompressHandler(http.FileServer(http.Dir(directory))) log.Fatalln(http.ListenAndServe(address, server)) - <-ctx.Done() close(errCh) }() - return &store{ + return &Store{ prefix: directory, }, errCh, nil } -func (s *store) Get(ctx context.Context, request *archivist.GetRequest) (*archivist.GetResponse, error) { - res, err := ioutil.ReadFile(filepath.Join(s.prefix, request.GetGitoid())) - if err != nil { - logrus.WithContext(ctx).Errorf("failed to retrieve object: %+v", err) - return nil, err - } - return &archivist.GetResponse{ - Gitoid: request.GetGitoid(), - Object: string(res), - }, nil +func (s *Store) Get(ctx context.Context, request *archivist.GetRequest) (io.ReadCloser, error) { + return os.Open(filepath.Join(s.prefix, request.GetGitoid()+".json")) } -func (s *store) Store(ctx context.Context, request *archivist.StoreRequest) (*emptypb.Empty, error) { - // TODO refactor this to use common code - gb := gitbom.NewSha256GitBom() - if err := gb.AddReference([]byte(request.Object), nil); err != nil { - logrus.WithContext(ctx).Errorf("gitbom tag generation failed: %+v", err) - return nil, err - } - - fmt.Printf("Writing: %s/%s\n", s.prefix, gb.Identity()) - err := ioutil.WriteFile(filepath.Join(s.prefix, gb.Identity()+".json"), []byte(request.Object), 0644) - if err != nil { - return nil, err - } - return &emptypb.Empty{}, nil +func (s *Store) Store(ctx context.Context, gitoid string, payload []byte) error { + return os.WriteFile(filepath.Join(s.prefix, gitoid+".json"), payload, 0644) } diff --git a/internal/server/server.go b/internal/server/server.go index 465f71a4..43ce4a3a 100644 --- a/internal/server/server.go +++ b/internal/server/server.go @@ -15,60 +15,141 @@ package server import ( + "bufio" "context" - "fmt" + "io" + + "github.com/git-bom/gitbom-go" "github.com/sirupsen/logrus" "github.com/testifysec/archivist-api/pkg/api/archivist" - "google.golang.org/protobuf/types/known/emptypb" ) +const ChunkSize = 64 * 1024 //64kb seems to be somewhat of an agreed upon message size when streaming: https://github.com/grpc/grpc.github.io/issues/371 + type archivistServer struct { archivist.UnimplementedArchivistServer - store archivist.ArchivistServer + store MetadataStorer } -func NewArchivistServer(store archivist.ArchivistServer) archivist.ArchivistServer { +func NewArchivistServer(store MetadataStorer) archivist.ArchivistServer { return &archivistServer{ store: store, } } func (s *archivistServer) GetBySubjectDigest(request *archivist.GetBySubjectDigestRequest, server archivist.Archivist_GetBySubjectDigestServer) error { - ctx := server.Context() + ctx, cancel := context.WithCancel(server.Context()) + defer cancel() logrus.WithContext(ctx).Printf("retrieving by subject... ") - return s.store.GetBySubjectDigest(request, server) + responses, err := s.store.GetBySubjectDigest(ctx, request) + if err != nil { + return err + } + + for response := range responses { + if err := server.Send(response); err != nil { + return err + } + } + + return nil } type collectorServer struct { archivist.UnimplementedCollectorServer - metadataStore archivist.CollectorServer - objectStore archivist.CollectorServer + metadataStore MetadataStorer + objectStore ObjectStorer } -func NewCollectorServer(metadataStore, objectStore archivist.CollectorServer) archivist.CollectorServer { +type Storer interface { + Store(context.Context, string, []byte) error +} + +type MetadataStorer interface { + Storer + GetBySubjectDigest(context.Context, *archivist.GetBySubjectDigestRequest) (<-chan *archivist.GetBySubjectDigestResponse, error) +} + +type ObjectStorer interface { + Storer + Get(context.Context, *archivist.GetRequest) (io.ReadCloser, error) +} + +func NewCollectorServer(metadataStore MetadataStorer, objectStore ObjectStorer) archivist.CollectorServer { return &collectorServer{ objectStore: objectStore, metadataStore: metadataStore, } } -func (s *collectorServer) Store(ctx context.Context, request *archivist.StoreRequest) (*emptypb.Empty, error) { - fmt.Println("middleware: store") - if _, err := s.metadataStore.Store(ctx, request); err != nil { +func (s *collectorServer) Store(server archivist.Collector_StoreServer) error { + ctx := server.Context() + payload := make([]byte, 0) + for { + c, err := server.Recv() + if err == io.EOF { + break + } + + if err != nil { + return err + } + + payload = append(payload, c.GetChunk()...) + } + + // generate gitbom + gb := gitbom.NewSha256GitBom() + if err := gb.AddReference(payload, nil); err != nil { + logrus.WithContext(ctx).Errorf("gitbom tag generation failed: %+v", err) + return err + } + + gitoid := gb.Identity() + if err := s.metadataStore.Store(ctx, gitoid, payload); err != nil { logrus.WithContext(ctx).Printf("received error from metadata store: %+v", err) - return nil, err + return err } - if _, err := s.objectStore.Store(ctx, request); err != nil { - logrus.WithContext(ctx).Printf("received error from object store: %+v", err) - return nil, err + if s.objectStore != nil { + if err := s.objectStore.Store(ctx, gitoid, payload); err != nil { + logrus.WithContext(ctx).Printf("received error from object store: %+v", err) + return err + } } - return &emptypb.Empty{}, nil + return server.SendAndClose(&archivist.StoreResponse{Gitoid: gitoid}) } -func (s *collectorServer) Get(ctx context.Context, request *archivist.GetRequest) (*archivist.GetResponse, error) { - return s.objectStore.Get(ctx, request) +func (s *collectorServer) Get(request *archivist.GetRequest, server archivist.Collector_GetServer) error { + if s.objectStore == nil { + return s.UnimplementedCollectorServer.Get(request, server) + } + + objReader, err := s.objectStore.Get(server.Context(), request) + defer objReader.Close() + if err != nil { + return err + } + + chunk := &archivist.Chunk{} + buf := make([]byte, ChunkSize) + r := bufio.NewReaderSize(objReader, ChunkSize) + for { + n, err := io.ReadFull(r, buf) + if err == io.EOF { + break + } else if err != nil && err != io.ErrUnexpectedEOF { + return err + } + + chunk.Chunk = buf[:n] + if err := server.Send(chunk); err != nil { + return err + } + } + + return nil }