Skip to content

Commit

Permalink
Separate read and write storage backend for mirror
Browse files Browse the repository at this point in the history
Allow district read side and write side configuration for ad mirror.

This enables a storage backend to be passively populated from a
retrieval backend since ingest will attempt to store all processed ads
regardless of their retrieval origin.
  • Loading branch information
masih committed Feb 15, 2024
1 parent 8a2176d commit be919f2
Show file tree
Hide file tree
Showing 8 changed files with 48 additions and 38 deletions.
4 changes: 3 additions & 1 deletion config/mirror.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,8 +16,10 @@ type Mirror struct {
// Compress specifies how to compress files. One of: "gzip", "none".
// Defaults to "gzip" if unspecified.
Compress string
// Storage configures the backing file store for the mirror.
// Storage configures the backing file store for the mirror write operations.
Storage filestore.Config
// Retrieval configures the backing file store for the mirror read operations.
Retrieval filestore.Config
}

// NewMirror returns Mirror with values set to their defaults.
Expand Down
17 changes: 15 additions & 2 deletions filestore/s3.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,8 +16,11 @@ import (
"github.com/aws/aws-sdk-go-v2/service/s3"
"github.com/aws/aws-sdk-go-v2/service/s3/types"
"github.com/aws/smithy-go"
"github.com/ipfs/go-log/v2"
)

var s3logger = log.Logger("filestore/s3")

// S3 is a file store that stores files in AWS S3.
//
// The region is set by environment variable and authentication is done by
Expand Down Expand Up @@ -96,14 +99,17 @@ func (s *S3) Get(ctx context.Context, relPath string) (*File, io.ReadCloser, err
if err != nil {
var nsk *types.NoSuchKey
if errors.As(err, &nsk) {
s3logger.Debugw("Cannot perform GET: no such key", "key", relPath)
return nil, nil, fs.ErrNotExist
}
var apiErr smithy.APIError
if errors.As(err, &apiErr) {
if apiErr.ErrorCode() == "NotFound" {
s3logger.Debugw("Cannot perform GET: API error not found", "key", relPath)
return nil, nil, fs.ErrNotExist
}
}
s3logger.Errorw("Failed to perform GET", "key", relPath, "err", err)
return nil, nil, err
}

Expand All @@ -115,6 +121,7 @@ func (s *S3) Get(ctx context.Context, relPath string) (*File, io.ReadCloser, err
file.Modified = *rsp.LastModified
}

s3logger.Debugw("Successfully performed GET", "key", relPath)
return file, &wrappedReadCloser{rsp.Body}, nil
}

Expand All @@ -128,17 +135,21 @@ func (s *S3) Head(ctx context.Context, relPath string) (*File, error) {
var apiErr smithy.APIError
if errors.As(err, &apiErr) {
if apiErr.ErrorCode() == "NotFound" {
s3logger.Debugw("Cannot perform HEAD: API error not found", "key", relPath)
return nil, fs.ErrNotExist
}
}
var nsk *types.NoSuchKey
if errors.As(err, &nsk) {
s3logger.Debugw("Cannot perform HEAD: no such key", "key", relPath)
return nil, fs.ErrNotExist
}
var nf *types.NotFound
if errors.As(err, &nf) {
s3logger.Debugw("Cannot perform HEAD: not found", "key", relPath)
return nil, fs.ErrNotExist
}
s3logger.Errorw("Failed to perform HEAD", "key", relPath, "err", err)
return nil, err
}

Expand All @@ -149,7 +160,7 @@ func (s *S3) Head(ctx context.Context, relPath string) (*File, error) {
if rsp.LastModified != nil {
file.Modified = *rsp.LastModified
}

s3logger.Debugw("Successfully performed HEAD", "key", relPath)
return file, nil
}

Expand Down Expand Up @@ -220,15 +231,17 @@ func (s *S3) Put(ctx context.Context, relPath string, reader io.Reader) (*File,
ContentType: aws.String("application/octet-stream"),
})
if err != nil {
s3logger.Errorw("Failed to perform PUT", "key", relPath, "err", err)
return nil, err
}

file, err := s.Head(ctx, relPath)
if err != nil {
s3logger.Errorw("Failed to perform HEAD as part of PUT", "key", relPath, "err", err)
return nil, err
}
file.URL = rsp.Location

s3logger.Debugw("Successfully performed PUT", "key", relPath)
return file, nil
}

Expand Down
3 changes: 1 addition & 2 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ require (
github.com/ipld/go-ipld-prime v0.21.0
github.com/ipld/go-ipld-prime/storage/dsadapter v0.0.0-20230102063945-1a409dc236dd
github.com/ipni/go-indexer-core v0.8.7
github.com/ipni/go-libipni v0.5.11
github.com/ipni/go-libipni v0.5.12
github.com/libp2p/go-libp2p v0.32.2
github.com/libp2p/go-msgio v0.3.0
github.com/mitchellh/go-homedir v1.1.0
Expand Down Expand Up @@ -204,7 +204,6 @@ require (
go.uber.org/fx v1.20.1 // indirect
go.uber.org/mock v0.4.0 // indirect
go.uber.org/multierr v1.11.0 // indirect
go4.org v0.0.0-20230225012048-214862532bf5 // indirect
golang.org/x/crypto v0.18.0 // indirect
golang.org/x/exp v0.0.0-20240103183307-be819d1f06fc // indirect
golang.org/x/mod v0.14.0 // indirect
Expand Down
10 changes: 3 additions & 7 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -594,8 +594,8 @@ github.com/ipld/go-ipld-prime/storage/dsadapter v0.0.0-20230102063945-1a409dc236
github.com/ipld/go-ipld-prime/storage/dsadapter v0.0.0-20230102063945-1a409dc236dd/go.mod h1:9DD/GM0JNPoisgR09F62kbBi7kHa4eDIea4XshXYOVc=
github.com/ipni/go-indexer-core v0.8.7 h1:IaEBoVe1RiTBDTj8MiUlbsYk/L32AOjtCBhQQmXuxIo=
github.com/ipni/go-indexer-core v0.8.7/go.mod h1:lLWTrQ7dhKwCak1qn6AQBNoSrGYBAXJJy2OGc02BO2Q=
github.com/ipni/go-libipni v0.5.11 h1:h79WCuLwAekIQpw09lzJ5FDc432FtEt1pxHnJavDs10=
github.com/ipni/go-libipni v0.5.11/go.mod h1:c8mHa6J9iFREpDB29GlPIsbvztRq6bnhg5zJKrnvdUg=
github.com/ipni/go-libipni v0.5.12 h1:skNIaCoGv2kXR4HCii0P1FjSUL5U7thSjGWnfdgw568=
github.com/ipni/go-libipni v0.5.12/go.mod h1:c8mHa6J9iFREpDB29GlPIsbvztRq6bnhg5zJKrnvdUg=
github.com/iris-contrib/blackfriday v2.0.0+incompatible/go.mod h1:UzZ2bDEoaSGPbkg6SAB4att1aAwTmVIx/5gCVqeyUdI=
github.com/iris-contrib/go.uuid v2.0.0+incompatible/go.mod h1:iz2lgM/1UnEf1kP0L/+fafWORmlnuysV2EMP8MW+qe0=
github.com/iris-contrib/i18n v0.0.0-20171121225848-987a633949d0/go.mod h1:pMCz62A0xJL6I+umB2YTlFRwWXaDFA0jy+5HzGiJjqI=
Expand Down Expand Up @@ -1300,9 +1300,8 @@ go.uber.org/zap v1.19.1/go.mod h1:j3DNczoxDZroyBnOT1L/Q79cfUMGZxlv/9dzN7SM1rI=
go.uber.org/zap v1.26.0 h1:sI7k6L95XOKS281NhVKOFCUNIvv9e0w4BF8N3u+tCRo=
go.uber.org/zap v1.26.0/go.mod h1:dtElttAiwGvoJ/vj4IwHBS/gXsEu/pZ50mUIRWuG0so=
go4.org v0.0.0-20180809161055-417644f6feb5/go.mod h1:MkTOUMDaeVYJUOUsaDXIhWPZYa1yOyC1qaOBpL57BhE=
go4.org v0.0.0-20200411211856-f5505b9728dd h1:BNJlw5kRTzdmyfh5U8F93HA2OwkP7ZGwA51eJ/0wKOU=
go4.org v0.0.0-20200411211856-f5505b9728dd/go.mod h1:CIiUVy99QCPfoE13bO4EZaz5GZMZXMSBGhxRdsvzbkg=
go4.org v0.0.0-20230225012048-214862532bf5 h1:nifaUDeh+rPaBCMPMQHZmvJf+QdpLFnuQPwx+LxVmtc=
go4.org v0.0.0-20230225012048-214862532bf5/go.mod h1:F57wTi5Lrj6WLyswp5EYV1ncrEbFGHD4hhz6S1ZYeaU=
golang.org/x/build v0.0.0-20190111050920-041ab4dc3f9d/go.mod h1:OWs+y06UdEOHN4y+MfF/py+xQ/tYqIWW03b70/CG9Rw=
golang.org/x/crypto v0.0.0-20170930174604-9419663f5a44/go.mod h1:6SG95UA2DQfeDnfUPMdvaQW0Q7yPrPDi9nlGo2tz2b4=
golang.org/x/crypto v0.0.0-20180904163835-0709b304e793/go.mod h1:6SG95UA2DQfeDnfUPMdvaQW0Q7yPrPDi9nlGo2tz2b4=
Expand Down Expand Up @@ -1427,7 +1426,6 @@ golang.org/x/net v0.0.0-20220127200216-cd36cc0744dd/go.mod h1:CfG3xpIq0wQ8r1q4Su
golang.org/x/net v0.0.0-20220225172249-27dd8689420f/go.mod h1:CfG3xpIq0wQ8r1q4Su4UZFWDARRcnwPjda9FqA0JpMk=
golang.org/x/net v0.0.0-20220722155237-a158d28d115b/go.mod h1:XRhObCWvk6IyKnWLug+ECip1KBveYUHfp+8e9klMJ9c=
golang.org/x/net v0.1.0/go.mod h1:Cx3nUiGt4eDBEyega/BKRp+/AlGL8hYe7U9odMt2Cco=
golang.org/x/net v0.7.0/go.mod h1:2Tu9+aMcznHK/AK1HMvgo6xiTLG5rD5rZLDS+rp2Bjs=
golang.org/x/net v0.20.0 h1:aCL9BSgETF1k+blQaYUBx9hJ9LOGP3gAVemcZlf1Kpo=
golang.org/x/net v0.20.0/go.mod h1:z8BVo6PvndSri0LbOE3hAn0apkU+1YvI6E70E9jsnvY=
golang.org/x/oauth2 v0.0.0-20180821212333-d2e6202438be/go.mod h1:N/0e6XlmueqKjAGxoOufVs8QHGRruUQn6yWY3a++T0U=
Expand Down Expand Up @@ -1541,7 +1539,6 @@ golang.org/x/term v0.0.0-20201117132131-f5c789dd3221/go.mod h1:Nr5EML6q2oocZ2LXR
golang.org/x/term v0.0.0-20201126162022-7de9c90e9dd1/go.mod h1:bj7SfCRtBDWHUb9snDiAeCFNEtKQo2Wmx5Cou7ajbmo=
golang.org/x/term v0.0.0-20210927222741-03fcf44c2211/go.mod h1:jbD1KX2456YbFQfuXm/mYQcufACuNUgVhRMnK/tPxf8=
golang.org/x/term v0.1.0/go.mod h1:jbD1KX2456YbFQfuXm/mYQcufACuNUgVhRMnK/tPxf8=
golang.org/x/term v0.5.0/go.mod h1:jMB1sMXY+tzblOD4FWmEbocvup2/aLOaQEp7JmGp78k=
golang.org/x/text v0.0.0-20170915032832-14c0d48ead0c/go.mod h1:NqM8EUOU14njkJ3fqMW+pc6Ldnwhi/IjpwHt7yyuwOQ=
golang.org/x/text v0.3.0/go.mod h1:NqM8EUOU14njkJ3fqMW+pc6Ldnwhi/IjpwHt7yyuwOQ=
golang.org/x/text v0.3.1-0.20180807135948-17ff2d5776d2/go.mod h1:NqM8EUOU14njkJ3fqMW+pc6Ldnwhi/IjpwHt7yyuwOQ=
Expand All @@ -1550,7 +1547,6 @@ golang.org/x/text v0.3.3/go.mod h1:5Zoc/QRtKVWzQhOtBMvqHzDpF6irO9z98xDceosuGiQ=
golang.org/x/text v0.3.6/go.mod h1:5Zoc/QRtKVWzQhOtBMvqHzDpF6irO9z98xDceosuGiQ=
golang.org/x/text v0.3.7/go.mod h1:u+2+/6zg+i71rQMx5EYifcz6MCKuco9NR6JIITiCfzQ=
golang.org/x/text v0.4.0/go.mod h1:mrYo+phRRbMaCq/xk9113O4dZlRixOauAjOtrjsXDZ8=
golang.org/x/text v0.7.0/go.mod h1:mrYo+phRRbMaCq/xk9113O4dZlRixOauAjOtrjsXDZ8=
golang.org/x/text v0.14.0 h1:ScX5w1eTa3QqT8oi6+ziP7dTV1S2+ALU0bI+0zXKWiQ=
golang.org/x/text v0.14.0/go.mod h1:18ZOQIKpY8NJVqYksKHtTdi31H5itFRjB5/qKTNYzSU=
golang.org/x/time v0.0.0-20180412165947-fbb02b2291d2/go.mod h1:tRJNPiyCQ0inRvYxbN9jk5I+vvW/OXSQhTDSoE431IQ=
Expand Down
2 changes: 2 additions & 0 deletions internal/ingest/ingest_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1604,6 +1604,8 @@ func TestGetEntryDataFromCar(t *testing.T) {
cfgWithMirror.AdvertisementMirror.Write = true
cfgWithMirror.AdvertisementMirror.Storage.Type = "local"
cfgWithMirror.AdvertisementMirror.Storage.Local.BasePath = tempDir
cfgWithMirror.AdvertisementMirror.Retrieval.Type = "local"
cfgWithMirror.AdvertisementMirror.Retrieval.Local.BasePath = tempDir

te := setupTestEnv(t, true, func(optCfg *testEnvOpts) {
optCfg.ingestConfig = &cfgWithMirror
Expand Down
46 changes: 22 additions & 24 deletions internal/ingest/mirror.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package ingest
import (
"context"
"fmt"
"reflect"

"github.com/ipfs/go-cid"
"github.com/ipfs/go-datastore"
Expand Down Expand Up @@ -47,36 +48,33 @@ func (m adMirror) readWriteSame() bool {

func newMirror(cfgMirror config.Mirror, dstore datastore.Batching) (adMirror, error) {
var m adMirror

if !(cfgMirror.Read || cfgMirror.Write) {
return m, nil
}

fileStore, err := filestore.MakeFilestore(cfgMirror.Storage)
if err != nil {
return m, fmt.Errorf("cannot create car file storage for mirror: %w", err)
}
if fileStore == nil {
return m, nil
}

if cfgMirror.Write {
m.carWriter, err = carstore.NewWriter(dstore, fileStore, carstore.WithCompress(cfgMirror.Compress))
if err != nil {
return m, fmt.Errorf("cannot create car file writer: %w", err)
switch writeStore, err := filestore.MakeFilestore(cfgMirror.Storage); {
case err != nil:
return m, fmt.Errorf("cannot create car file storage for mirror: %w", err)
case writeStore != nil:
m.carWriter, err = carstore.NewWriter(dstore, writeStore, carstore.WithCompress(cfgMirror.Compress))
if err != nil {
return m, fmt.Errorf("cannot create mirror car file writer: %w", err)
}
default:
log.Warnw("Mirror write is enabled with no storage backend", "backendType", cfgMirror.Storage.Type)
}
}

if cfgMirror.Read {
m.carReader, err = carstore.NewReader(fileStore, carstore.WithCompress(cfgMirror.Compress))
if err != nil {
return m, fmt.Errorf("cannot create car file reader: %w", err)
}

if m.carWriter != nil { // TODO: && rdFileStore == wrFileStore {
m.rdWrSame = true
switch readStore, err := filestore.MakeFilestore(cfgMirror.Retrieval); {
case err != nil:
return m, fmt.Errorf("cannot create car file retrieval for mirror: %w", err)
case readStore != nil:
m.carReader, err = carstore.NewReader(readStore, carstore.WithCompress(cfgMirror.Compress))
if err != nil {
return m, fmt.Errorf("cannot create mirror car file reader: %w", err)
}
default:
log.Warnw("Mirror read is enabled with no retrieval backend", "backendType", cfgMirror.Retrieval.Type)
}
}

m.rdWrSame = m.carWriter != nil && m.carReader != nil && reflect.DeepEqual(cfgMirror.Storage, cfgMirror.Retrieval)
return m, nil
}
2 changes: 1 addition & 1 deletion ipni-gc/cmd/ipnigc/daemon.go
Original file line number Diff line number Diff line change
Expand Up @@ -114,7 +114,7 @@ func daemonAction(cctx *cli.Context) error {

var fileStore filestore.Interface
cfgMirror := cfg.Ingest.AdvertisementMirror
if cfgMirror.Read || cfgMirror.Write {
if cfgMirror.Write {
fileStore, err = filestore.MakeFilestore(cfgMirror.Storage)
if err != nil {
return err
Expand Down
2 changes: 1 addition & 1 deletion ipni-gc/cmd/ipnigc/provider.go
Original file line number Diff line number Diff line change
Expand Up @@ -146,7 +146,7 @@ func providerAction(cctx *cli.Context) error {

var fileStore filestore.Interface
cfgMirror := cfg.Ingest.AdvertisementMirror
if cfgMirror.Read || cfgMirror.Write {
if cfgMirror.Write {
fileStore, err = filestore.MakeFilestore(cfgMirror.Storage)
if err != nil {
return err
Expand Down

0 comments on commit be919f2

Please sign in to comment.